diff --git a/README.md b/README.md index 20a8fa5..ed9dbeb 100644 --- a/README.md +++ b/README.md @@ -6,12 +6,12 @@ Unlock the full potential of your database integrating ReadySet and ProxySQL by This scheduler executes the following steps: 1. Locks an in disk file (configured by `lock_file`) to avoid multiple instances of the scheduler to overlap their execution. -2. Check if it can connect to Readyset and validate if `Snapshot Status` is `Completed`. In case it cannot connect or Readyset is still performing snapshot it adjust the server status to `SHUNNED` in ProxySQL. -2. Queries the table `stats_mysql_query_digest` from ProxySQL and validates if each query is supported by Readyset +2. If `mode=(All|HealthCheck)` - Query `mysql_servers` and check all servers that have `comment='Readyset` (case insensitive) and `hostgroup=readyset_hostgroup`. For each server it checks if it can connect to Readyset and validate if `Snapshot Status` is `Completed`. In case it cannot connect or Readyset is still performing snapshot it adjust the server status to `SHUNNED` in ProxySQL. +3. If `mode=(All|QueryDiscovery)` Query the table `stats_mysql_query_digest` finding queries executed at `source_hostgroup` by `readyset_user` and validates if each query is supported by Readyset. The rules to order queries are configured by [Query Discovery](#query-discovery) configurations. 3. If the query is supported it adds a cache in Readyset by executing `CREATE CACHE FROM __query__`. 4. If `warmup_time` is NOT configure, a new query rule will be added redirecting this query to Readyset 5. If `warmup_time` is configured, a new query rule will be added to mirror this query to Readyset. The query will still be redirected to the original hostgroup -6. Once `warmup_time` seconds has elapsed since the query was mirrored, the query rule will be updated to redirect the qury to Readyset instead of mirroring. +6. Once `warmup_time` seconds has elapsed since the query was mirrored, the query rule will be updated to redirect the query to Readyset instead of mirroring. @@ -20,11 +20,13 @@ This scheduler executes the following steps: Assuming you have your ProxySQL already Configured you will need to create a new hostgroup and add Readyset to this hostgroup: ``` -INSERT INTO mysql_servers (hostgroup_id, hostname, port) VALUES (99, '127.0.0.1', 3307); +INSERT INTO mysql_servers (hostgroup_id, hostname, port, comment) VALUES (99, '127.0.0.1', 3307, 'Readyset'); LOAD MYSQL SERVERS TO RUNTIME; SAVE MYSQL SERVERS TO DISK; ``` +*NOTE*: It's required to add `Readyset` as a comment to the server to be able to identify it in the scheduler. + To configure the scheduler to run execute: ``` @@ -40,13 +42,66 @@ Configure `/etc/readyset_proxysql_scheduler.cnf` as follow: * `proxysql_port` - (Required) - Proxysql admin port * `readyset_user` - (Required) - Readyset application user * `readyset_password` - (Required) - Readyset application password -* `readyset_host` - (Required) - Readyset host -* `readyset_port` - (Required) - Readyset port * `source_hostgroup` - (Required) - Hostgroup running your Read workload * `readyset_hostgroup` - (Required) - Hostgroup where Readyset is configure * `warmup_time` - (Optional) - Time in seconds to mirror a query supported before redirecting the query to Readyset (Default 0 - no mirror) * `lock_file` - (Optional) - Lock file to prevent two instances of the scheduler to run at the same time (Default '/etc/readyset_scheduler.lock') +* `operation_mode` - (Optional) - Operation mode to run the scheduler. The options are described in [Operation Mode](#operation-mode) (Default All). +* `number_of_queries` - (Optional) - Number of queries to cache in Readyset (Default 10). +* `query_discovery_mode` / `query_discovery_min_execution` / `query_discovery_min_row_sent` - (Optional) - Query Discovery configurations. The options are described in [Query Discovery](#query-discovery) (Default CountStar / 0 / 0). + + +# Query Discovery +The Query Discovery is a set of configuration to find queries that are supported by Readyset. The configurations are defined by the following fields: + +* `query_discovery_mode`: (Optional) - Mode to discover queries to automatically cache in Readyset. The options are described in [Query Discovery Mode](#query-discovery-mode) (Default CountStar). +* `query_discovery_min_execution`: (Optional) - Minimum number of executions of a query to be considered a candidate to be cached (Default 0). +* `query_discovery_min_row_sent`: (Optional) - Minimum number of rows sent by a query to be considered a candidate to be cached (Default 0). + +# Query Discovery Mode +The Query Discovery Mode is a set of possible rules to discover queries to automatically cache in Readyset. The options are: + +1. `CountStar` - Total Number of Query Executions + * Formula: `total_executions = count_star` + * Description: This metric gives the total number of times the query has been executed. It is valuable for understanding how frequently the query runs. A high count_star value suggests that the query is executed often. + +2. `SumTime` - Total Time Spent Executing the Query + * Formula: `total_execution_time = sum_time` + * Description: This metric represents the total cumulative time spent (measured in microseconds) executing the query across all its executions. It provides a clear understanding of how much processing time the query is consuming over time. A high total execution time can indicate that the query is either frequently executed or is time-intensive to process. + +3. `SumRowsSent` - Total Number of Rows Sent by the Query (sum_rows_sent) + * Formula: `total_rows_sent = sum_rows_sent` + * Description: This metric provides the total number of rows sent to the client across all executions of the query. It helps you understand the query’s output volume and the amount of data being transmitted. + +4. `MeanTime` - Average Query Execution Time (Mean) + * Formula: `mean_time = sum_time / count_star` + * Description: The mean time gives you an idea of the typical performance (measured in microseconds) of the query over all executions. It provides a central tendency of how long the query generally takes to execute. + +5. `ExecutionTimeDistance` - Time Distance Between Query Executions + * Formula: `execution_time_distance = max_time - min_time` + * Description: This shows the spread between the fastest and slowest executions of the query (measured in microseconds). A large range might indicate variability in system load, input sizes, or external factors affecting performance. + +6. `QueryThroughput` - Query Throughput + * Formula: `query_throughput = count_star / sum_time` + * Description: This shows how many queries are processed per unit of time (measured in microseconds). It’s useful for understanding system capacity and how efficiently the database is handling the queries. + +7. `WorstBestCase` - Worst Best-Case Query Performance + * Formula: `worst_case = max(min_time)` + * Description: The min_time metric gives the fastest time the query was ever executed (measured in microseconds). It reflects the best-case performance scenario, which could indicate the query’s performance under optimal conditions. + +8. `WorstWorstCase` - Worst Worst-Case Query Performance + * Formula: `worst_case = max(max_time)` + * Description: The max_time shows the slowest time the query was executed (measured in microseconds). This can indicate potential bottlenecks or edge cases where the query underperforms, which could be due to larger data sets, locks, or high server load. + +9. `DistanceMeanMax` - Distance Between Mean Time and Max Time (mean_time vs max_time) + * Formula: `distance_mean_max = max_time - mean_time` + * Description: The distance between the mean execution time and the maximum execution time provides insight into how much slower the worst-case execution is compared to the average (measured in microseconds). A large gap indicates significant variability in query performance, which could be caused by certain executions encountering performance bottlenecks, such as large datasets, locking, or high system load. +# Operation Mode +The Operation Mode is a set of possible rules to run the scheduler. The options are: +* `All` - Run `HealthCheck` and `QueryDiscovery` operations. +* `HealthCheck` - Run only the health check operation. +* `QueryDiscovery` - Run only the query discovery operation. # Note Readyset support of MySQL and this scheduler are alpha quality, meaning they are not currently part of our test cycle. Run your own testing before plugging this to your production system. diff --git a/build/docker-compose.yml b/build/docker-compose.yml index 77bf090..a7c89c8 100644 --- a/build/docker-compose.yml +++ b/build/docker-compose.yml @@ -29,6 +29,7 @@ services: - "6034:6034" environment: - UPSTREAM_DB_URL=mysql://root:noria@172.17.0.1:3306/noria + - LISTEN_ADDRESS=0.0.0.0:3307 depends_on: mysql-master: condition: service_healthy diff --git a/build/proxysql.cnf b/build/proxysql.cnf index a45afd7..08a0b4f 100644 --- a/build/proxysql.cnf +++ b/build/proxysql.cnf @@ -17,14 +17,14 @@ mysql_variables= monitor_password="noria" } -mysql_users: +mysql_users= ( { - username = "root" - password = "noria" - default_hostgroup = 1 + username="root" + password="noria" + default_hostgroup=1 max_connections=1000 default_schema="noria" - active = 1 + active=1 } ) diff --git a/build/test.cnf b/build/test.cnf new file mode 100644 index 0000000..80625b0 --- /dev/null +++ b/build/test.cnf @@ -0,0 +1,13 @@ +proxysql_user = 'radmin' +proxysql_password = 'radmin' +proxysql_host = '127.0.0.1' +proxysql_port = 6032 +readyset_user = 'root' +readyset_password = 'noria' +source_hostgroup = 1 +readyset_hostgroup = 2 +warmup_time_s = 10 +lock_file = '/tmp/readyset_scheduler.lock' +operation_mode='All' +number_of_queries=2 +query_discovery_mode='SumTime' diff --git a/readyset_proxysql_scheduler.cnf b/readyset_proxysql_scheduler.cnf index a73df77..f4654b8 100644 --- a/readyset_proxysql_scheduler.cnf +++ b/readyset_proxysql_scheduler.cnf @@ -4,11 +4,10 @@ proxysql_host = '127.0.0.1' proxysql_port = 6032 readyset_user = 'root' readyset_password = 'root' -readyset_host = '127.0.0.1' -readyset_port = 3307 source_hostgroup = 11 readyset_hostgroup = 99 warmup_time = 60 lock_file = '/tmp/readyset_scheduler.lock' operation_mode="All" number_of_queries=10 +query_discovery_mode='SumTime' \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index 4e9ce06..0fe6816 100644 --- a/src/config.rs +++ b/src/config.rs @@ -14,7 +14,7 @@ pub enum OperationMode { impl From for OperationMode { fn from(s: String) -> Self { - match s.as_str() { + match s.to_lowercase().as_str() { "health_check" => OperationMode::HealthCheck, "query_discovery" => OperationMode::QueryDiscovery, "all" => OperationMode::All, @@ -33,6 +33,39 @@ impl Display for OperationMode { } } +#[derive(serde::Deserialize, Clone, Copy, PartialEq, PartialOrd, Default)] +pub enum QueryDiscoveryMode { + #[default] + CountStar, + SumTime, + SumRowsSent, + MeanTime, + ExecutionTimeDistance, + QueryThroughput, + WorstBestCase, + WorstWorstCase, + DistanceMeanMax, + External, +} + +impl From for QueryDiscoveryMode { + fn from(s: String) -> Self { + match s.to_lowercase().as_str() { + "count_star" => QueryDiscoveryMode::CountStar, + "sum_time" => QueryDiscoveryMode::SumTime, + "sum_rows_sent" => QueryDiscoveryMode::SumRowsSent, + "mean_time" => QueryDiscoveryMode::MeanTime, + "execution_time_distance" => QueryDiscoveryMode::ExecutionTimeDistance, + "query_throughput" => QueryDiscoveryMode::QueryThroughput, + "worst_best_case" => QueryDiscoveryMode::WorstBestCase, + "worst_worst_case" => QueryDiscoveryMode::WorstWorstCase, + "distance_mean_max" => QueryDiscoveryMode::DistanceMeanMax, + "external" => QueryDiscoveryMode::External, + _ => QueryDiscoveryMode::CountStar, + } + } +} + #[derive(serde::Deserialize, Clone)] pub struct Config { pub proxysql_user: String, @@ -41,14 +74,15 @@ pub struct Config { pub proxysql_port: u16, pub readyset_user: String, pub readyset_password: String, - pub readyset_host: String, - pub readyset_port: u16, pub source_hostgroup: u16, pub readyset_hostgroup: u16, - pub warmup_time: Option, + pub warmup_time_s: Option, pub lock_file: Option, pub operation_mode: Option, pub number_of_queries: u16, + pub query_discovery_mode: Option, + pub query_discovery_min_execution: Option, + pub query_discovery_min_row_sent: Option, } pub fn read_config_file(path: &str) -> Result { diff --git a/src/health_check.rs b/src/health_check.rs deleted file mode 100644 index 37b2930..0000000 --- a/src/health_check.rs +++ /dev/null @@ -1,27 +0,0 @@ -use crate::{ - config, messages, - server::{self, ServerStatus}, -}; - -pub fn health_check( - proxysql_conn: &mut mysql::PooledConn, - config: &config::Config, - readyset_conn: &mut mysql::PooledConn, -) { - match server::check_readyset_is_ready(readyset_conn) { - Ok(ready) => { - if ready { - let _ = server::change_server_status(proxysql_conn, config, ServerStatus::Online); - } else { - messages::print_info("Readyset is still running Snapshot."); - let _ = server::change_server_status(proxysql_conn, config, ServerStatus::Shunned); - std::process::exit(0); - } - } - Err(e) => { - messages::print_error(format!("Cannot check Readyset status: {}.", e).as_str()); - let _ = server::change_server_status(proxysql_conn, config, ServerStatus::Shunned); - std::process::exit(1); - } - }; -} diff --git a/src/hosts.rs b/src/hosts.rs new file mode 100644 index 0000000..1110fae --- /dev/null +++ b/src/hosts.rs @@ -0,0 +1,222 @@ +use crate::{config::Config, queries::Query}; +use core::fmt; +use mysql::{prelude::Queryable, Conn, OptsBuilder}; + +#[allow(dead_code)] +/// Defines the possible status of a host +#[derive(PartialEq, Clone, Copy)] +pub enum HostStatus { + /// backend server is fully operational + Online, + /// backend sever is temporarily taken out of use because of either too many connection errors in a time that was too short, or the replication lag exceeded the allowed threshold + Shunned, + /// when a server is put into OFFLINE_SOFT mode, no new connections are created toward that server, while the existing connections are kept until they are returned to the connection pool or destructed. In other words, connections are kept in use until multiplexing is enabled again, for example when a transaction is completed. This makes it possible to gracefully detach a backend as long as multiplexing is efficient + OfflineSoft, + /// when a server is put into OFFLINE_HARD mode, no new connections are created toward that server and the existing free connections are immediately dropped, while backend connections currently associated with a client session are dropped as soon as the client tries to use them. This is equivalent to deleting the server from a hostgroup. Internally, setting a server in OFFLINE_HARD status is equivalent to deleting the server + OfflineHard, +} + +impl fmt::Display for HostStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + HostStatus::Online => write!(f, "ONLINE"), + HostStatus::Shunned => write!(f, "SHUNNED"), + HostStatus::OfflineSoft => write!(f, "OFFLINE_SOFT"), + HostStatus::OfflineHard => write!(f, "OFFLINE_HARD"), + } + } +} + +impl From for HostStatus { + fn from(s: String) -> Self { + match s.to_uppercase().as_str() { + "ONLINE" => HostStatus::Online, + "SHUNNED" => HostStatus::Shunned, + "OFFLINE_SOFT" => HostStatus::OfflineSoft, + "OFFLINE_HARD" => HostStatus::OfflineHard, + _ => HostStatus::Online, + } + } +} + +/// Represents a Readyset host +pub struct Host { + hostname: String, + port: u16, + status: HostStatus, + conn: Option, +} + +impl Host { + /// Creates a new `Host` instance with the given hostname and port. + /// The connection to the host is established during the creation of the instance. + /// If the connection fails, the `conn` field will be `None`. + /// If the connection is successful, the `conn` field will contain the connection. + /// + /// # Arguments + /// + /// * `hostname` - The hostname of the host. + /// * `port` - The port number of the host. + /// + /// # Returns + /// + /// A new `Host` instance. + pub fn new(hostname: String, port: u16, status: String, config: &Config) -> Host { + let conn = match Conn::new( + OptsBuilder::new() + .ip_or_hostname(Some(hostname.clone())) + .tcp_port(port) + .user(Some(config.readyset_user.clone())) + .pass(Some(config.readyset_password.clone())) + .prefer_socket(false), + ) { + Ok(conn) => conn, + Err(err) => { + eprintln!("Failed to establish connection: {}", err); + return Host { + hostname, + port, + status: HostStatus::from(status), + conn: None, + }; + } + }; + + Host { + hostname, + port, + status: HostStatus::from(status), + conn: Some(conn), + } + } + + /// Gets the hostname of the host. + /// + /// # Returns + /// + /// The hostname of the host. + pub fn get_hostname(&self) -> &String { + &self.hostname + } + + /// Gets the port of the host. + /// + /// # Returns + /// + /// The port of the host. + pub fn get_port(&self) -> u16 { + self.port + } + + /// Gets the status of the host. + /// + /// # Returns + /// + /// The status of the host. + pub fn get_status(&self) -> HostStatus { + self.status + } + + /// Changes the status of the host. + /// + /// # Arguments + /// + /// * `status` - The new status of the host. + pub fn change_status(&mut self, status: HostStatus) { + self.status = status; + } + + /// Checks if the host is online. + /// + /// # Returns + /// + /// true if the host is online, false otherwise. + pub fn is_online(&self) -> bool { + self.status == HostStatus::Online + } + + /// Checks if the Readyset host is ready to serve traffic. + /// This is done by querying the SHOW READYSET STATUS command. + /// + /// # Returns + /// + /// true if the host is ready, false otherwise. + pub fn check_readyset_is_ready(&mut self) -> Result { + match &mut self.conn { + Some(conn) => { + let rows: Vec<(String, String)> = + conn.query("SHOW READYSET STATUS").unwrap_or(vec![]); + for (field, value) in rows { + if field == "Snapshot Status" { + return Ok(value == "Completed"); + } + } + Ok(false) + } + None => Err(mysql::Error::IoError(std::io::Error::new( + std::io::ErrorKind::Other, + "Connection to Readyset host is not established", + ))), + } + } + + /// Checks if the host supports the given query. + /// This is done by querying the EXPLAIN CREATE CACHE FROM command. + /// + /// # Arguments + /// + /// * `digest_text` - The digest text of the query. + /// * `schema` - The schema of the query. + /// + /// # Returns + /// + /// true if the host supports the query, false otherwise. + pub fn check_query_support( + &mut self, + digest_text: &String, + schema: &String, + ) -> Result { + match &mut self.conn { + Some(conn) => { + conn.query_drop(format!("USE {}", schema)) + .expect("Failed to use schema"); + let row: Option<(String, String, String)> = + conn.query_first(format!("EXPLAIN CREATE CACHE FROM {}", digest_text))?; + match row { + Some((_, _, value)) => Ok(value == "yes" || value == "cached"), + None => Ok(false), + } + } + None => Ok(false), + } + } + + /// Caches the given query on the host. + /// This is done by executing the CREATE CACHE FROM command. + /// + /// # Arguments + /// + /// * `digest_text` - The digest text of the query. + /// + /// # Returns + /// + /// true if the query was cached successfully, false otherwise. + pub fn cache_query(&mut self, query: &Query) -> Result { + match &mut self.conn { + None => { + return Err(mysql::Error::IoError(std::io::Error::new( + std::io::ErrorKind::Other, + "Connection to Readyset host is not established", + ))) + } + Some(conn) => { + conn.query_drop(format!( + "CREATE CACHE d_{} FROM {}", + query.get_digest(), + query.get_digest_text() + ))?; + } + } + Ok(true) + } +} diff --git a/src/main.rs b/src/main.rs index 16e8e59..9177ef6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,17 +1,14 @@ mod config; -mod health_check; +mod hosts; mod messages; +mod proxysql; mod queries; -mod server; use clap::Parser; use config::read_config_file; -use mysql::OptsBuilder; -use mysql::{Pool, PoolConstraints, PoolOpts}; - use file_guard::Lock; -use queries::query_discovery; -use server::ServerStatus; +use mysql::{Conn, OptsBuilder}; +use proxysql::ProxySQL; use std::fs::OpenOptions; /// Readyset ProxySQL Scheduler @@ -64,44 +61,7 @@ fn main() { } }; - let proxysql_pool = Pool::new( - OptsBuilder::new() - .ip_or_hostname(Some(config.proxysql_host.as_str())) - .tcp_port(config.proxysql_port) - .user(Some(config.proxysql_user.as_str())) - .pass(Some(config.proxysql_password.as_str())) - .prefer_socket(false) - .pool_opts( - PoolOpts::default() - .with_reset_connection(false) - .with_constraints(PoolConstraints::new(1, 1).unwrap()), - ), - ) - .unwrap(); - let mut proxysql_conn = proxysql_pool.get_conn().unwrap(); - - let readyset_pool = match Pool::new( - OptsBuilder::new() - .ip_or_hostname(Some(config.readyset_host.as_str())) - .tcp_port(config.readyset_port) - .user(Some(config.readyset_user.as_str())) - .pass(Some(config.readyset_password.as_str())) - .prefer_socket(false) - .pool_opts( - PoolOpts::default() - .with_reset_connection(false) - .with_constraints(PoolConstraints::new(1, 1).unwrap()), - ), - ) { - Ok(conn) => conn, - Err(e) => { - messages::print_error(format!("Cannot connect to Readyset: {}.", e).as_str()); - let _ = - server::change_server_status(&mut proxysql_conn, &config, ServerStatus::Shunned); - std::process::exit(1); - } - }; - let mut readyset_conn = readyset_pool.get_conn().unwrap(); + let mut proxysql = ProxySQL::new(&config); let running_mode = match config.operation_mode { Some(mode) => mode, @@ -111,13 +71,25 @@ fn main() { if running_mode == config::OperationMode::HealthCheck || running_mode == config::OperationMode::All { - health_check::health_check(&mut proxysql_conn, &config, &mut readyset_conn) + proxysql.health_check(); } + // retain only healthy hosts + //hosts.retain_online(); if running_mode == config::OperationMode::QueryDiscovery || running_mode == config::OperationMode::All { - query_discovery(&mut proxysql_conn, &config, &mut readyset_conn); + let mut conn = Conn::new( + OptsBuilder::new() + .ip_or_hostname(Some(config.proxysql_host.as_str())) + .tcp_port(config.proxysql_port) + .user(Some(config.proxysql_user.as_str())) + .pass(Some(config.proxysql_password.clone().as_str())) + .prefer_socket(false), + ) + .expect("Failed to create ProxySQL connection"); + let mut query_discovery = queries::QueryDiscovery::new(config); + query_discovery.run(&mut proxysql, &mut conn); } messages::print_info("Finished readyset_scheduler"); diff --git a/src/proxysql.rs b/src/proxysql.rs new file mode 100644 index 0000000..0a830ee --- /dev/null +++ b/src/proxysql.rs @@ -0,0 +1,249 @@ +use chrono::{DateTime, Local}; +use mysql::{prelude::Queryable, Conn, OptsBuilder}; + +use crate::{ + config, + hosts::{Host, HostStatus}, + messages, + queries::Query, +}; + +const MIRROR_QUERY_TOKEN: &str = "Mirror by readyset scheduler at"; +const DESTINATION_QUERY_TOKEN: &str = "Added by readyset scheduler at"; +pub struct ProxySQL { + readyset_hostgroup: u16, + warmup_time_s: u16, + conn: mysql::Conn, + hosts: Vec, +} + +impl ProxySQL { + /// This function is used to create a new ProxySQL struct. + /// + /// # Arguments + /// + /// * `config` - A reference to a config::Config containing the configuration for the ProxySQL connection. + /// + /// # Returns + /// + /// A new ProxySQL struct. + pub fn new(config: &config::Config) -> Self { + let mut conn = Conn::new( + OptsBuilder::new() + .ip_or_hostname(Some(config.proxysql_host.as_str())) + .tcp_port(config.proxysql_port) + .user(Some(config.proxysql_user.as_str())) + .pass(Some(config.proxysql_password.as_str())) + .prefer_socket(false), + ) + .expect("Failed to create ProxySQL connection"); + + let query = format!( + "SELECT hostname, port, status, comment FROM mysql_servers WHERE hostgroup_id = {}", + config.readyset_hostgroup + ); + let results: Vec<(String, u16, String, String)> = conn.query(query).unwrap(); + let hosts = results + .into_iter() + .filter_map(|(hostname, port, status, comment)| { + if comment.to_lowercase().contains("readyset") { + Some(Host::new(hostname, port, status, config)) + } else { + None + } + }) + .collect::>(); + + ProxySQL { + conn, + readyset_hostgroup: config.readyset_hostgroup, + warmup_time_s: config.warmup_time_s.unwrap_or(0), + hosts, + } + } + + /// This function is used to add a query rule to ProxySQL. + /// + /// # Arguments + /// + /// * `query` - A reference to a Query containing the query to be added as a rule. + /// + /// # Returns + /// + /// A boolean indicating if the rule was added successfully. + pub fn add_as_query_rule(&mut self, query: &Query) -> Result { + let datetime_now: DateTime = Local::now(); + let date_formatted = datetime_now.format("%Y-%m-%d %H:%M:%S"); + if self.warmup_time_s > 0 { + self.conn.query_drop(format!("INSERT INTO mysql_query_rules (username, mirror_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, '{}: {}')", query.get_user(), self.readyset_hostgroup, query.get_digest(), MIRROR_QUERY_TOKEN, date_formatted)).expect("Failed to insert into mysql_query_rules"); + messages::print_info("Inserted warm-up rule"); + } else { + self.conn.query_drop(format!("INSERT INTO mysql_query_rules (username, destination_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, '{}: {}')", query.get_user(), self.readyset_hostgroup, query.get_digest(), DESTINATION_QUERY_TOKEN, date_formatted)).expect("Failed to insert into mysql_query_rules"); + messages::print_info("Inserted destination rule"); + } + Ok(true) + } + + pub fn load_query_rules(&mut self) -> Result { + self.conn + .query_drop("LOAD MYSQL QUERY RULES TO RUNTIME") + .expect("Failed to load query rules"); + Ok(true) + } + pub fn save_query_rules(&mut self) -> Result { + self.conn + .query_drop("SAVE MYSQL QUERY RULES TO DISK") + .expect("Failed to load query rules"); + Ok(true) + } + + /// This function is used to check the current list of queries routed to Readyset. + /// + /// # Arguments + /// * `conn` - A reference to a connection to ProxySQL. + /// + /// # Returns + /// A vector of tuples containing the digest_text, digest, and schemaname of the queries that are currently routed to ReadySet. + pub fn find_queries_routed_to_readyset(&mut self) -> Vec { + let rows: Vec = self + .conn + .query(format!( + "SELECT digest FROM mysql_query_rules WHERE comment LIKE '{}%' OR comment LIKE '{}%'", + MIRROR_QUERY_TOKEN, DESTINATION_QUERY_TOKEN + )) + .expect("Failed to find queries routed to ReadySet"); + rows + } + + /// This function is used to check if any mirror query rule needs to be changed to destination. + /// + /// # Returns + /// + /// A boolean indicating if any mirror query rule was changed to destination. + pub fn adjust_mirror_rules(&mut self) -> Result { + let mut updated_rules = false; + let datetime_now: DateTime = Local::now(); + let tz = datetime_now.format("%z").to_string(); + let date_formatted = datetime_now.format("%Y-%m-%d %H:%M:%S"); + let rows: Vec<(u16, String)> = self.conn.query(format!("SELECT rule_id, comment FROM mysql_query_rules WHERE comment LIKE '{}: ____-__-__ __:__:__';", MIRROR_QUERY_TOKEN)).expect("Failed to select mirror rules"); + for (rule_id, comment) in rows { + let datetime_mirror_str = comment + .split("Mirror by readyset scheduler at:") + .nth(1) + .unwrap_or("") + .trim(); + let datetime_mirror_str = format!("{} {}", datetime_mirror_str, tz); + let datetime_mirror_rule = + DateTime::parse_from_str(datetime_mirror_str.as_str(), "%Y-%m-%d %H:%M:%S %z") + .unwrap_or_else(|_| { + panic!("Failed to parse datetime from comment: {}", comment); + }); + let elapsed = datetime_now + .signed_duration_since(datetime_mirror_rule) + .num_seconds(); + if elapsed > self.warmup_time_s as i64 { + let comment = format!( + "{}\n Added by readyset scheduler at: {}", + comment, date_formatted + ); + self.conn.query_drop(format!("UPDATE mysql_query_rules SET mirror_hostgroup = NULL, destination_hostgroup = {}, comment = '{}' WHERE rule_id = {}", self.readyset_hostgroup, comment, rule_id)).expect("Failed to update rule"); + messages::print_info( + format!("Updated rule ID {} from warmup to destination", rule_id).as_str(), + ); + updated_rules = true; + } + } + Ok(updated_rules) + } + + /// This function is used to check if a given host is healthy. + /// This is done by checking if the Readyset host has an active + /// connection and if the snapshot is completed. + pub fn health_check(&mut self) { + let mut status_changes = Vec::new(); + + for host in self.hosts.iter_mut() { + match host.check_readyset_is_ready() { + Ok(ready) => { + if ready { + status_changes.push((host, HostStatus::Online)); + } else { + messages::print_info("Readyset is still running Snapshot."); + status_changes.push((host, HostStatus::Shunned)); + } + } + Err(e) => { + messages::print_error(format!("Cannot check Readyset status: {}.", e).as_str()); + status_changes.push((host, HostStatus::Shunned)); + } + }; + } + + for (host, status) in status_changes { + if host.get_status() != status { + let where_clause = format!( + "WHERE hostgroup_id = {} AND hostname = '{}' AND port = {}", + self.readyset_hostgroup, + host.get_hostname(), + host.get_port() + ); + messages::print_info( + format!( + "Server HG: {}, Host: {}, Port: {} is currently {}. Changing to {}", + self.readyset_hostgroup, + host.get_hostname(), + host.get_port(), + host.get_status(), + status + ) + .as_str(), + ); + let _ = self.conn.query_drop(format!( + "UPDATE mysql_servers SET status = '{}' {}", + host.get_status(), + where_clause + )); + host.change_status(status); + let _ = self.conn.query_drop("LOAD MYSQL SERVERS TO RUNTIME"); + let _ = self.conn.query_drop("SAVE MYSQL SERVERS TO DISK"); + } + } + } + + /// This function is used to get the number of online hosts. + /// This is done by filtering the hosts vector and counting the number of hosts with status Online. + /// + /// # Returns + /// + /// A u16 containing the number of online hosts. + pub fn number_of_online_hosts(&self) -> u16 { + self.hosts + .iter() + .filter(|host| host.is_online()) + .collect::>() + .len() as u16 + } + + /// This function is used to get the first online host. + /// This is done by iterating over the hosts vector and returning the first host with status Online. + /// + /// # Returns + /// + /// An Option containing a reference to the first online host. + pub fn get_first_online_host(&mut self) -> Option<&mut Host> { + self.hosts.iter_mut().find(|host| host.is_online()) + } + + /// This function is used to get all the online hosts. + /// This is done by filtering the hosts vector and collecting the hosts with status Online. + /// + /// # Returns + /// + /// A vector containing references to the online hosts. + pub fn get_online_hosts(&mut self) -> Vec<&mut Host> { + self.hosts + .iter_mut() + .filter(|host| host.is_online()) + .collect() + } +} diff --git a/src/queries.rs b/src/queries.rs index e8fbe4a..a8b738e 100644 --- a/src/queries.rs +++ b/src/queries.rs @@ -1,201 +1,262 @@ -use chrono::{DateTime, Local}; -use mysql::{prelude::Queryable, PooledConn}; - use crate::{ - config::{self, Config}, + config::{Config, QueryDiscoveryMode}, messages, + proxysql::ProxySQL, }; +use mysql::{prelude::Queryable, Conn}; -const MIRROR_QUERY_TOKEN: &str = "Mirror by readyset scheduler at"; -const DESTINATION_QUERY_TOKEN: &str = "Added by readyset scheduler at"; - -/// This function is used to find queries that are not cached in ReadySet and are not in the mysql_query_rules table. -/// -/// # Arguments -/// * `conn` - A mutable reference to a pooled connection to ProxySQL. -/// * `config` - A reference to the configuration struct. -/// -/// # Returns -/// A vector of tuples containing the digest_text, digest, and schema name of the queries that are not cached in ReadySet and are not in the mysql_query_rules table. -pub fn find_queries_to_cache( - conn: &mut PooledConn, - config: &Config, -) -> Vec<(String, String, String)> { - let rows: Vec<(String, String, String)> = conn - .query(format!( - "SELECT s.digest_text, s.digest, s.schemaname - FROM stats_mysql_query_digest s - LEFT JOIN mysql_query_rules q - USING(digest) - WHERE s.hostgroup = {} - AND s.username = '{}' - AND s.schemaname NOT IN ('sys', 'information_schema', 'performance_schema', 'mysql') - AND s.digest_text LIKE 'SELECT%FROM%' - AND digest_text NOT LIKE '%?=?%' - AND s.sum_rows_sent > 0 - AND q.rule_id IS NULL - ORDER BY s.sum_rows_sent DESC - LIMIT {}", - config.source_hostgroup, - config.readyset_user, - (config.number_of_queries * 2) - )) - .expect("Failed to find queries to cache"); - rows -} - -/// This function is used to check the current list of queries routed to Readyset. -/// -/// # Arguments -/// * `conn` - A mutable reference to a pooled connection to ProxySQL. -/// -/// # Returns -/// A vector of tuples containing the digest_text, digest, and schemaname of the queries that are currently routed to ReadySet. -pub fn find_queries_routed_to_readyset(conn: &mut PooledConn) -> Vec { - let rows: Vec = conn - .query(format!( - "SELECT digest FROM mysql_query_rules WHERE comment LIKE '{}%' OR comment LIKE '{}%'", - MIRROR_QUERY_TOKEN, DESTINATION_QUERY_TOKEN - )) - .expect("Failed to find queries routed to ReadySet"); - rows +pub struct Query { + digest_text: String, + digest: String, + schema: String, + user: String, } -pub fn replace_placeholders(query: &str) -> String { - // date placeholder +impl Query { + /// This function is used to create a new Query struct. + /// + /// # Arguments + /// + /// * `digest_text` - A string containing the digest text of the query. + /// * `digest` - A string containing the digest of the query. + /// * `schema` - A string containing the schema name of the query. + /// * `user` - A string containing the user that executed the query. + /// + /// # Returns + /// + /// A new Query struct. + fn new(digest_text: String, digest: String, schema: String, user: String) -> Self { + Query { + digest_text, + digest, + schema, + user, + } + } - query.replace("?-?-?", "?") -} + /// This function is used to get the digest text of the query. + /// + /// # Returns + /// A string containing the digest text of the query. + pub fn get_digest_text(&self) -> &String { + &self.digest_text + } -pub fn check_readyset_query_support( - conn: &mut PooledConn, - digest_text: &String, - schema: &String, -) -> Result { - conn.query_drop(format!("USE {}", schema)) - .expect("Failed to use schema"); - let row: Option<(String, String, String)> = - conn.query_first(format!("EXPLAIN CREATE CACHE FROM {}", digest_text))?; - match row { - Some((_, _, value)) => Ok(value == "yes" || value == "cached"), - None => Ok(false), + /// This function is used to get the digest of the query. + /// + /// # Returns + /// + /// A string containing the digest of the query. + pub fn get_digest(&self) -> &String { + &self.digest } -} -pub fn cache_query( - conn: &mut PooledConn, - digest_text: &String, - digest: &String, -) -> Result { - conn.query_drop(format!("CREATE CACHE d_{} FROM {}", digest, digest_text)) - .expect("Failed to create readyset cache"); - Ok(true) -} + /// This function is used to get the schema name of the query. + /// + /// # Returns + /// + /// A string containing the schema name of the query. + pub fn get_schema(&self) -> &String { + &self.schema + } -pub fn add_query_rule( - conn: &mut PooledConn, - digest: &String, - config: &Config, -) -> Result { - let datetime_now: DateTime = Local::now(); - let date_formatted = datetime_now.format("%Y-%m-%d %H:%M:%S"); - if config.warmup_time.is_some() { - conn.query_drop(format!("INSERT INTO mysql_query_rules (username, mirror_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, '{}: {}')", config.readyset_user, config.readyset_hostgroup, digest, MIRROR_QUERY_TOKEN, date_formatted)).expect("Failed to insert into mysql_query_rules"); - messages::print_info("Inserted warm-up rule"); - } else { - conn.query_drop(format!("INSERT INTO mysql_query_rules (username, destination_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, '{}: {}')", config.readyset_user, config.readyset_hostgroup, digest, DESTINATION_QUERY_TOKEN, date_formatted)).expect("Failed to insert into mysql_query_rules"); - messages::print_info("Inserted destination rule"); + /// This function is used to get the user that executed the query. + /// + /// # Returns + /// + /// A string containing the user that executed the query. + pub fn get_user(&self) -> &String { + &self.user } - Ok(true) } -pub fn load_query_rules(conn: &mut PooledConn) -> Result { - conn.query_drop("LOAD MYSQL QUERY RULES TO RUNTIME") - .expect("Failed to load query rules"); - Ok(true) -} -pub fn save_query_rules(conn: &mut PooledConn) -> Result { - conn.query_drop("SAVE MYSQL QUERY RULES TO DISK") - .expect("Failed to load query rules"); - Ok(true) +pub struct QueryDiscovery { + query_discovery_mode: QueryDiscoveryMode, + query_discovery_min_execution: u64, + query_discovery_min_rows_sent: u64, + source_hostgroup: u16, + readyset_user: String, + number_of_queries: u16, + offset: u16, } -pub fn adjust_mirror_rules(conn: &mut PooledConn, config: &Config) -> Result { - let mut updated_rules = false; - let datetime_now: DateTime = Local::now(); - let tz = datetime_now.format("%z").to_string(); - let date_formatted = datetime_now.format("%Y-%m-%d %H:%M:%S"); - let rows: Vec<(u16, String)> = conn.query(format!("SELECT rule_id, comment FROM mysql_query_rules WHERE comment LIKE '{}: ____-__-__ __:__:__';", MIRROR_QUERY_TOKEN)).expect("Failed to select mirror rules"); - for (rule_id, comment) in rows { - let datetime_mirror_str = comment - .split("Mirror by readyset scheduler at:") - .nth(1) - .unwrap_or("") - .trim(); - let datetime_mirror_str = format!("{} {}", datetime_mirror_str, tz); - let datetime_mirror_rule = - DateTime::parse_from_str(datetime_mirror_str.as_str(), "%Y-%m-%d %H:%M:%S %z") - .unwrap_or_else(|_| { - panic!("Failed to parse datetime from comment: {}", comment); - }); - let elapsed = datetime_now - .signed_duration_since(datetime_mirror_rule) - .num_seconds(); - if elapsed > config.warmup_time.unwrap() as i64 { - let comment = format!( - "{}\n Added by readyset scheduler at: {}", - comment, date_formatted - ); - conn.query_drop(format!("UPDATE mysql_query_rules SET mirror_hostgroup = NULL, destination_hostgroup = {}, comment = '{}' WHERE rule_id = {}", config.readyset_hostgroup, comment, rule_id)).expect("Failed to update rule"); - messages::print_info( - format!("Updated rule ID {} from warmup to destination", rule_id).as_str(), - ); - updated_rules = true; +/// Query Discovery is a feature responsible for discovering queries that are hurting the database performance. +/// The queries are discovered by analyzing the stats_mysql_query_digest table and finding queries that are not cached in ReadySet and are not in the mysql_query_rules table. +/// The query discover is also responsible for promoting the queries from mirror(warmup) to destination. +impl QueryDiscovery { + /// This function is used to create a new QueryDiscovery struct. + /// + /// # Arguments + /// + /// * `query_discovery_mode` - A QueryDiscoveryMode containing the mode to use for query discovery. + /// * `config` - A Config containing the configuration for the query discovery. + /// * `offset` - A u16 containing the offset to use for query discovery. + /// + /// # Returns + /// + /// A new QueryDiscovery struct. + pub fn new(config: Config) -> Self { + QueryDiscovery { + query_discovery_mode: config + .query_discovery_mode + .unwrap_or(QueryDiscoveryMode::CountStar), + query_discovery_min_execution: config.query_discovery_min_execution.unwrap_or(0), + query_discovery_min_rows_sent: config.query_discovery_min_row_sent.unwrap_or(0), + source_hostgroup: config.source_hostgroup, + readyset_user: config.readyset_user.clone(), + number_of_queries: config.number_of_queries, + offset: 0, } } - Ok(updated_rules) -} -pub fn query_discovery( - proxysql_conn: &mut mysql::PooledConn, - config: &config::Config, - readyset_conn: &mut mysql::PooledConn, -) { - let mut queries_added_or_change = adjust_mirror_rules(proxysql_conn, config).unwrap(); + /// This function is used to generate the query responsible for finding queries that are not cached in ReadySet and are not in the mysql_query_rules table. + /// Queries have to return 3 fields: digest_text, digest, and schema name. + /// + /// # Arguments + /// + /// * `query_discovery_mode` - A QueryDiscoveryMode containing the mode to use for query discovery. + /// + /// # Returns + /// + /// A string containing the query responsible for finding queries that are not cached in ReadySet and are not in the mysql_query_rules table. + fn query_builder(&self) -> String { + let order_by = match self.query_discovery_mode { + QueryDiscoveryMode::SumRowsSent => "s.sum_rows_sent".to_string(), + QueryDiscoveryMode::SumTime => "s.sum_time".to_string(), + QueryDiscoveryMode::MeanTime => "(s.sum_time / s.count_star)".to_string(), + QueryDiscoveryMode::CountStar => "s.count_star".to_string(), + QueryDiscoveryMode::ExecutionTimeDistance => "(s.max_time - s.min_time)".to_string(), + QueryDiscoveryMode::QueryThroughput => "(s.count_star / s.sum_time)".to_string(), + QueryDiscoveryMode::WorstBestCase => "s.min_time".to_string(), + QueryDiscoveryMode::WorstWorstCase => "s.max_time".to_string(), + QueryDiscoveryMode::DistanceMeanMax => { + "(s.max_time - (s.sum_time / s.count_star))".to_string() + } + QueryDiscoveryMode::External => unreachable!("External mode is caught earlier"), + }; - let mut current_queries_digest: Vec = find_queries_routed_to_readyset(proxysql_conn); - let rows: Vec<(String, String, String)> = find_queries_to_cache(proxysql_conn, config); + format!( + "SELECT s.digest_text, s.digest, s.schemaname + FROM stats_mysql_query_digest s + LEFT JOIN mysql_query_rules q + USING(digest) + WHERE s.hostgroup = {} + AND s.username = '{}' + AND s.schemaname NOT IN ('sys', 'information_schema', 'performance_schema', 'mysql') + AND s.digest_text LIKE 'SELECT%FROM%' + AND digest_text NOT LIKE '%?=?%' + AND s.count_star > {} + AND s.sum_rows_sent > {} + AND q.rule_id IS NULL + ORDER BY {} DESC + LIMIT {} OFFSET {}", + self.source_hostgroup, + self.readyset_user, + self.query_discovery_min_execution, + self.query_discovery_min_rows_sent, + order_by, + self.number_of_queries, + self.offset + ) + } - for (digest_text, digest, schema) in rows { - if current_queries_digest.len() > config.number_of_queries as usize { - break; + pub fn run(&mut self, proxysql: &mut ProxySQL, conn: &mut Conn) { + if proxysql.number_of_online_hosts() == 0 { + return; } - let digest_text = replace_placeholders(&digest_text); - messages::print_info(format!("Going to test query support for {}", digest_text).as_str()); - let supported = check_readyset_query_support(readyset_conn, &digest_text, &schema); - match supported { - Ok(true) => { + + let mut queries_added_or_change = proxysql.adjust_mirror_rules().unwrap(); + + let mut current_queries_digest: Vec = proxysql.find_queries_routed_to_readyset(); + + let mut more_queries = true; + while more_queries && current_queries_digest.len() < self.number_of_queries as usize { + let queries_to_cache = self.find_queries_to_cache(conn); + more_queries = !queries_to_cache.is_empty(); + for query in queries_to_cache[0..queries_to_cache.len()].iter() { + if current_queries_digest.len() > self.number_of_queries as usize { + break; + } + let digest_text = self.replace_placeholders(query.get_digest_text()); messages::print_info( - "Query is supported, adding it to proxysql and readyset" - .to_string() - .as_str(), + format!("Going to test query support for {}", digest_text).as_str(), ); - queries_added_or_change = true; - cache_query(readyset_conn, &digest_text, &digest) - .expect("Failed to create readyset cache"); - add_query_rule(proxysql_conn, &digest, config).expect("Failed to add query rule"); - current_queries_digest.push(digest); + let supported = proxysql + .get_first_online_host() + .unwrap() + .check_query_support(&digest_text, query.get_schema()); // Safe to unwrap because we checked if hosts is empty + match supported { + Ok(true) => { + messages::print_info( + "Query is supported, adding it to proxysql and readyset" + .to_string() + .as_str(), + ); + queries_added_or_change = true; + proxysql.get_online_hosts().iter_mut().for_each(|host| { + host.cache_query(query) + .expect("Failed to create readyset cache"); + }); + proxysql + .add_as_query_rule(query) + .expect("Failed to add query rule"); + current_queries_digest.push(query.get_digest().to_string()); + } + Ok(false) => { + messages::print_info("Query is not supported"); + } + Err(err) => { + messages::print_warning( + format!("Failed to check query support: {}", err).as_str(), + ); + } + } } - Ok(false) => { - messages::print_info("Query is not supported"); + self.offset += queries_to_cache.len() as u16; + } + if queries_added_or_change { + proxysql + .load_query_rules() + .expect("Failed to load query rules"); + proxysql + .save_query_rules() + .expect("Failed to save query rules"); + } + } + + /// This function is used to find queries that are not cached in ReadySet and are not in the mysql_query_rules table. + /// + /// # Arguments + /// * `conn` - A reference to a connection to ProxySQL. + /// * `config` - A reference to the configuration struct. + /// + /// # Returns + /// A vector of tuples containing the digest_text, digest, and schema name of the queries that are not cached in ReadySet and are not in the mysql_query_rules table. + fn find_queries_to_cache(&self, con: &mut Conn) -> Vec { + match self.query_discovery_mode { + QueryDiscoveryMode::External => { + todo!("External mode is not implemented yet"); } - Err(err) => { - messages::print_warning(format!("Failed to check query support: {}", err).as_str()); + _ => { + let query = self.query_builder(); + let rows: Vec<(String, String, String)> = + con.query(query).expect("Failed to find queries to cache"); + rows.iter() + .map(|(digest_text, digest, schema)| { + Query::new( + digest_text.to_string(), + digest.to_string(), + schema.to_string(), + self.readyset_user.clone(), + ) + }) + .collect() } } } - if queries_added_or_change { - load_query_rules(proxysql_conn).expect("Failed to load query rules"); - save_query_rules(proxysql_conn).expect("Failed to save query rules"); + + fn replace_placeholders(&self, query: &str) -> String { + // date placeholder + query.replace("?-?-?", "?") } } diff --git a/src/server.rs b/src/server.rs deleted file mode 100644 index 3ea9e13..0000000 --- a/src/server.rs +++ /dev/null @@ -1,71 +0,0 @@ -use core::fmt; - -use crate::{config::Config, messages}; -use mysql::{prelude::Queryable, PooledConn}; - -#[allow(dead_code)] -pub enum ServerStatus { - //backend server is fully operational - Online, - //backend sever is temporarily taken out of use because of either too many connection errors in a time that was too short, or the replication lag exceeded the allowed threshold - Shunned, - //when a server is put into OFFLINE_SOFT mode, no new connections are created toward that server, while the existing connections are kept until they are returned to the connection pool or destructed. In other words, connections are kept in use until multiplexing is enabled again, for example when a transaction is completed. This makes it possible to gracefully detach a backend as long as multiplexing is efficient - OfflineSoft, - //when a server is put into OFFLINE_HARD mode, no new connections are created toward that server and the existing **free **connections are ** immediately dropped**, while backend connections currently associated with a client session are dropped as soon as the client tries to use them. This is equivalent to deleting the server from a hostgroup. Internally, setting a server in OFFLINE_HARD status is equivalent to deleting the server - OfflineHard, -} - -impl fmt::Display for ServerStatus { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ServerStatus::Online => write!(f, "ONLINE"), - ServerStatus::Shunned => write!(f, "SHUNNED"), - ServerStatus::OfflineSoft => write!(f, "OFFLINE_SOFT"), - ServerStatus::OfflineHard => write!(f, "OFFLINE_HARD"), - } - } -} - -pub fn check_readyset_is_ready(rs_conn: &mut PooledConn) -> Result { - let rows: Vec<(String, String)> = rs_conn.query("SHOW READYSET STATUS").unwrap_or(vec![]); - for (field, value) in rows { - if field == "Snapshot Status" { - return Ok(value == "Completed"); - } - } - Ok(false) -} - -pub fn change_server_status( - ps_conn: &mut PooledConn, - config: &Config, - server_status: ServerStatus, -) -> Result { - let where_clause = format!( - "WHERE hostgroup_id = {} AND hostname = '{}' AND port = {}", - config.readyset_hostgroup, config.readyset_host, config.readyset_port - ); - let select_query = format!("SELECT status FROM runtime_mysql_servers {}", where_clause); - let status: Option = ps_conn.query_first(select_query)?; - if status.as_ref().unwrap() != &server_status.to_string() { - messages::print_info( - format!( - "Server HG: {}, Host: {}, Port: {} is currently {}. Changing to {}", - config.readyset_hostgroup, - config.readyset_host, - config.readyset_port, - status.unwrap(), - server_status - ) - .as_str(), - ); - ps_conn.query_drop(format!( - "UPDATE mysql_servers SET status = '{}' {}", - server_status, where_clause - ))?; - ps_conn.query_drop("LOAD MYSQL SERVERS TO RUNTIME")?; - ps_conn.query_drop("SAVE MYSQL SERVERS TO DISK")?; - } - - Ok(true) -}