From 40576154b9682e18679775b58816efb70328f049 Mon Sep 17 00:00:00 2001 From: Marcelo Altmann Date: Mon, 2 Sep 2024 17:59:53 -0300 Subject: [PATCH] Add host list capability This commit adds the ability to read a list of hosts from mysql_servers table instead of a single host from config file. This merges the old server.rs into the hosts file. This also remove the Pooled connection to a single connection. Fixes: #5 --- build/docker-compose.yml | 1 + build/test.cnf | 12 ++ readyset_proxysql_scheduler.cnf | 4 +- src/config.rs | 2 - src/health_check.rs | 21 +-- src/hosts.rs | 272 ++++++++++++++++++++++++++++++++ src/main.rs | 53 ++----- src/queries.rs | 104 +++--------- src/server.rs | 71 --------- 9 files changed, 332 insertions(+), 208 deletions(-) create mode 100644 build/test.cnf create mode 100644 src/hosts.rs delete mode 100644 src/server.rs diff --git a/build/docker-compose.yml b/build/docker-compose.yml index 77bf090..a7c89c8 100644 --- a/build/docker-compose.yml +++ b/build/docker-compose.yml @@ -29,6 +29,7 @@ services: - "6034:6034" environment: - UPSTREAM_DB_URL=mysql://root:noria@172.17.0.1:3306/noria + - LISTEN_ADDRESS=0.0.0.0:3307 depends_on: mysql-master: condition: service_healthy diff --git a/build/test.cnf b/build/test.cnf new file mode 100644 index 0000000..cc3b7d0 --- /dev/null +++ b/build/test.cnf @@ -0,0 +1,12 @@ +proxysql_user = 'radmin' +proxysql_password = 'radmin' +proxysql_host = '127.0.0.1' +proxysql_port = 6032 +readyset_user = 'root' +readyset_password = 'noria' +source_hostgroup = 1 +readyset_hostgroup = 2 +warmup_time = 60 +lock_file = '/tmp/readyset_scheduler.lock' +operation_mode="All" +number_of_queries=10 diff --git a/readyset_proxysql_scheduler.cnf b/readyset_proxysql_scheduler.cnf index a73df77..7b44f3e 100644 --- a/readyset_proxysql_scheduler.cnf +++ b/readyset_proxysql_scheduler.cnf @@ -1,11 +1,9 @@ -proxysql_user = 'admin' +proxysql_user = 'radmin' proxysql_password = 'admin' proxysql_host = '127.0.0.1' proxysql_port = 6032 readyset_user = 'root' readyset_password = 'root' -readyset_host = '127.0.0.1' -readyset_port = 3307 source_hostgroup = 11 readyset_hostgroup = 99 warmup_time = 60 diff --git a/src/config.rs b/src/config.rs index 4e9ce06..90ae6f1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -41,8 +41,6 @@ pub struct Config { pub proxysql_port: u16, pub readyset_user: String, pub readyset_password: String, - pub readyset_host: String, - pub readyset_port: u16, pub source_hostgroup: u16, pub readyset_hostgroup: u16, pub warmup_time: Option, diff --git a/src/health_check.rs b/src/health_check.rs index 37b2930..bde914d 100644 --- a/src/health_check.rs +++ b/src/health_check.rs @@ -1,27 +1,22 @@ use crate::{ - config, messages, - server::{self, ServerStatus}, + config, + hosts::{Host, HostStatus}, + messages, }; -pub fn health_check( - proxysql_conn: &mut mysql::PooledConn, - config: &config::Config, - readyset_conn: &mut mysql::PooledConn, -) { - match server::check_readyset_is_ready(readyset_conn) { +pub fn health_check(proxysql_conn: &mut mysql::Conn, config: &config::Config, host: &mut Host) { + match host.check_readyset_is_ready() { Ok(ready) => { if ready { - let _ = server::change_server_status(proxysql_conn, config, ServerStatus::Online); + let _ = host.change_status(proxysql_conn, config, HostStatus::Online); } else { messages::print_info("Readyset is still running Snapshot."); - let _ = server::change_server_status(proxysql_conn, config, ServerStatus::Shunned); - std::process::exit(0); + let _ = host.change_status(proxysql_conn, config, HostStatus::Shunned); } } Err(e) => { messages::print_error(format!("Cannot check Readyset status: {}.", e).as_str()); - let _ = server::change_server_status(proxysql_conn, config, ServerStatus::Shunned); - std::process::exit(1); + let _ = host.change_status(proxysql_conn, config, HostStatus::Shunned); } }; } diff --git a/src/hosts.rs b/src/hosts.rs new file mode 100644 index 0000000..2275255 --- /dev/null +++ b/src/hosts.rs @@ -0,0 +1,272 @@ +use crate::{config::Config, messages}; +use core::fmt; +use mysql::{prelude::Queryable, Conn, OptsBuilder}; + +#[allow(dead_code)] +/// Defines the possible status of a host +#[derive(PartialEq, Clone, Copy)] +pub enum HostStatus { + ///backend server is fully operational + Online, + //backend sever is temporarily taken out of use because of either too many connection errors in a time that was too short, or the replication lag exceeded the allowed threshold + Shunned, + //when a server is put into OFFLINE_SOFT mode, no new connections are created toward that server, while the existing connections are kept until they are returned to the connection pool or destructed. In other words, connections are kept in use until multiplexing is enabled again, for example when a transaction is completed. This makes it possible to gracefully detach a backend as long as multiplexing is efficient + OfflineSoft, + //when a server is put into OFFLINE_HARD mode, no new connections are created toward that server and the existing **free **connections are ** immediately dropped**, while backend connections currently associated with a client session are dropped as soon as the client tries to use them. This is equivalent to deleting the server from a hostgroup. Internally, setting a server in OFFLINE_HARD status is equivalent to deleting the server + OfflineHard, +} + +impl fmt::Display for HostStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + HostStatus::Online => write!(f, "ONLINE"), + HostStatus::Shunned => write!(f, "SHUNNED"), + HostStatus::OfflineSoft => write!(f, "OFFLINE_SOFT"), + HostStatus::OfflineHard => write!(f, "OFFLINE_HARD"), + } + } +} + +impl From for HostStatus { + fn from(s: String) -> Self { + match s.to_uppercase().as_str() { + "ONLINE" => HostStatus::Online, + "SHUNNED" => HostStatus::Shunned, + "OFFLINE_SOFT" => HostStatus::OfflineSoft, + "OFFLINE_HARD" => HostStatus::OfflineHard, + _ => HostStatus::Online, + } + } +} + +/// Represents a Readyset host +pub struct Host { + hostname: String, + port: u16, + status: HostStatus, + conn: Option, +} + +impl Host { + /// Creates a new `Host` instance with the given hostname and port. + /// The connection to the host is established during the creation of the instance. + /// If the connection fails, the `conn` field will be `None`. + /// If the connection is successful, the `conn` field will contain the connection. + /// + /// # Arguments + /// + /// * `hostname` - The hostname of the host. + /// * `port` - The port number of the host. + /// + /// # Returns + /// + /// A new `Host` instance. + pub fn new(hostname: String, port: u16, status: String, config: &Config) -> Host { + let conn = match Conn::new( + OptsBuilder::new() + .ip_or_hostname(Some(hostname.clone())) + .tcp_port(port) + .user(Some(config.readyset_user.clone())) + .pass(Some(config.readyset_password.clone())) + .prefer_socket(false), + ) { + Ok(conn) => conn, + Err(err) => { + eprintln!("Failed to establish connection: {}", err); + return Host { + hostname, + port, + status: HostStatus::from(status), + conn: None, + }; + } + }; + + Host { + hostname, + port, + status: HostStatus::from(status), + conn: Some(conn), + } + } + + /// Gets the hostname of the host. + /// + /// # Returns + /// + /// The hostname of the host. + pub fn get_hostname(&self) -> &String { + &self.hostname + } + + /// Gets the port of the host. + /// + /// # Returns + /// + /// The port of the host. + pub fn get_port(&self) -> u16 { + self.port + } + + /// Gets the status of the host. + /// + /// # Returns + /// + /// The status of the host. + fn get_status(&self) -> HostStatus { + self.status + } + + /// Checks if the host is online. + /// + /// # Returns + /// + /// true if the host is online, false otherwise. + pub fn is_online(&self) -> bool { + self.status == HostStatus::Online + } + + /// Checks if the Readyset host is ready to serve traffic. + /// This is done by querying the SHOW READYSET STATUS command. + /// + /// # Returns + /// + /// true if the host is ready, false otherwise. + pub fn check_readyset_is_ready(&mut self) -> Result { + match &mut self.conn { + Some(conn) => { + let rows: Vec<(String, String)> = + conn.query("SHOW READYSET STATUS").unwrap_or(vec![]); + for (field, value) in rows { + if field == "Snapshot Status" { + return Ok(value == "Completed"); + } + } + Ok(false) + } + None => Err(mysql::Error::from(mysql::Error::IoError(std::io::Error::new( + std::io::ErrorKind::Other, + "Connection to host is not established", + )))), + } + } + + /// Checks if the host supports the given query. + /// This is done by querying the EXPLAIN CREATE CACHE FROM command. + /// + /// # Arguments + /// + /// * `digest_text` - The digest text of the query. + /// * `schema` - The schema of the query. + /// + /// # Returns + /// + /// true if the host supports the query, false otherwise. + pub fn check_query_support( + &mut self, + digest_text: &String, + schema: &String, + ) -> Result { + match &mut self.conn { + Some(conn) => { + conn.query_drop(format!("USE {}", schema)) + .expect("Failed to use schema"); + let row: Option<(String, String, String)> = + conn.query_first(format!("EXPLAIN CREATE CACHE FROM {}", digest_text))?; + match row { + Some((_, _, value)) => Ok(value == "yes" || value == "cached"), + None => Ok(false), + } + } + None => Ok(false), + } + } + + /// Caches the given query on the host. + /// This is done by executing the CREATE CACHE FROM command. + /// + /// # Arguments + /// + /// * `digest_text` - The digest text of the query. + /// + /// # Returns + /// + /// true if the query was cached successfully, false otherwise. + pub fn cache_query(&mut self, digest_text: &String) -> Result { + match &mut self.conn { + None => return Ok(false), + Some(conn) => { + conn.query_drop(format!("CREATE CACHE FROM {}", digest_text)) + .expect("Failed to create readyset cache"); + } + } + Ok(true) + } + + /// Changes the status of the host in the ProxySQL mysql_servers table. + /// The status is set to the given `status`. + pub fn change_status( + &mut self, + ps_conn: &mut Conn, + config: &Config, + status: HostStatus, + ) -> Result { + let where_clause = format!( + "WHERE hostgroup_id = {} AND hostname = '{}' AND port = {}", + config.readyset_hostgroup, + self.get_hostname(), + self.get_port() + ); + if self.status != status { + messages::print_info( + format!( + "Server HG: {}, Host: {}, Port: {} is currently {}. Changing to {}", + config.readyset_hostgroup, + self.get_hostname(), + self.get_port(), + self.get_status(), + status + ) + .as_str(), + ); + self.status = status; + ps_conn.query_drop(format!( + "UPDATE mysql_servers SET status = '{}' {}", + self.get_status(), + where_clause + ))?; + ps_conn.query_drop("LOAD MYSQL SERVERS TO RUNTIME")?; + ps_conn.query_drop("SAVE MYSQL SERVERS TO DISK")?; + } + + Ok(true) + } +} + +/// Fetches the hosts from the ProxySQL mysql_servers table. +/// +/// # Arguments +/// +/// * `proxysql_conn` - The connection to the ProxySQL instance. +/// * `config` - The configuration object. +/// +/// # Returns +/// +/// A vector of `Host` instances. +pub fn get_hosts<'a>(proxysql_conn: &'a mut Conn, config: &'a Config) -> Vec { + let query = format!( + "SELECT hostname, port, status, comment FROM mysql_servers WHERE hostgroup_id = {}", + config.readyset_hostgroup + ); + let results: Vec<(String, u16, String, String)> = proxysql_conn.query(query).unwrap(); + results + .into_iter() + .filter_map(|(hostname, port, status, comment)| { + if comment.to_lowercase().contains("readyset") { + Some(Host::new(hostname, port, status, config)) + } else { + None + } + }) + .collect::>() +} diff --git a/src/main.rs b/src/main.rs index 16e8e59..acf2dbb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,17 +1,15 @@ mod config; mod health_check; +mod hosts; mod messages; mod queries; -mod server; use clap::Parser; use config::read_config_file; -use mysql::OptsBuilder; -use mysql::{Pool, PoolConstraints, PoolOpts}; +use mysql::{Conn, OptsBuilder}; use file_guard::Lock; use queries::query_discovery; -use server::ServerStatus; use std::fs::OpenOptions; /// Readyset ProxySQL Scheduler @@ -63,61 +61,40 @@ fn main() { std::process::exit(1); } }; - - let proxysql_pool = Pool::new( + let mut proxysql_conn = Conn::new( OptsBuilder::new() .ip_or_hostname(Some(config.proxysql_host.as_str())) .tcp_port(config.proxysql_port) .user(Some(config.proxysql_user.as_str())) .pass(Some(config.proxysql_password.as_str())) - .prefer_socket(false) - .pool_opts( - PoolOpts::default() - .with_reset_connection(false) - .with_constraints(PoolConstraints::new(1, 1).unwrap()), - ), + .prefer_socket(false), ) .unwrap(); - let mut proxysql_conn = proxysql_pool.get_conn().unwrap(); - - let readyset_pool = match Pool::new( - OptsBuilder::new() - .ip_or_hostname(Some(config.readyset_host.as_str())) - .tcp_port(config.readyset_port) - .user(Some(config.readyset_user.as_str())) - .pass(Some(config.readyset_password.as_str())) - .prefer_socket(false) - .pool_opts( - PoolOpts::default() - .with_reset_connection(false) - .with_constraints(PoolConstraints::new(1, 1).unwrap()), - ), - ) { - Ok(conn) => conn, - Err(e) => { - messages::print_error(format!("Cannot connect to Readyset: {}.", e).as_str()); - let _ = - server::change_server_status(&mut proxysql_conn, &config, ServerStatus::Shunned); - std::process::exit(1); - } - }; - let mut readyset_conn = readyset_pool.get_conn().unwrap(); let running_mode = match config.operation_mode { Some(mode) => mode, None => config::OperationMode::All, }; + let mut hosts = hosts::get_hosts(&mut proxysql_conn, &config); + if running_mode == config::OperationMode::HealthCheck || running_mode == config::OperationMode::All { - health_check::health_check(&mut proxysql_conn, &config, &mut readyset_conn) + hosts.iter_mut().for_each(|host| { + health_check::health_check(&mut proxysql_conn, &config, host); + }); } + // retain only healthy hosts + let hosts: Vec = hosts + .into_iter() + .filter(|host| host.is_online()) + .collect(); if running_mode == config::OperationMode::QueryDiscovery || running_mode == config::OperationMode::All { - query_discovery(&mut proxysql_conn, &config, &mut readyset_conn); + query_discovery(&mut proxysql_conn, &config, hosts); } messages::print_info("Finished readyset_scheduler"); diff --git a/src/queries.rs b/src/queries.rs index e8fbe4a..e3a4b3c 100644 --- a/src/queries.rs +++ b/src/queries.rs @@ -1,26 +1,13 @@ use chrono::{DateTime, Local}; -use mysql::{prelude::Queryable, PooledConn}; +use mysql::{prelude::Queryable, Conn}; use crate::{ config::{self, Config}, + hosts::Host, messages, }; -const MIRROR_QUERY_TOKEN: &str = "Mirror by readyset scheduler at"; -const DESTINATION_QUERY_TOKEN: &str = "Added by readyset scheduler at"; - -/// This function is used to find queries that are not cached in ReadySet and are not in the mysql_query_rules table. -/// -/// # Arguments -/// * `conn` - A mutable reference to a pooled connection to ProxySQL. -/// * `config` - A reference to the configuration struct. -/// -/// # Returns -/// A vector of tuples containing the digest_text, digest, and schema name of the queries that are not cached in ReadySet and are not in the mysql_query_rules table. -pub fn find_queries_to_cache( - conn: &mut PooledConn, - config: &Config, -) -> Vec<(String, String, String)> { +pub fn find_queries_to_cache(conn: &mut Conn, config: &Config) -> Vec<(String, String, String)> { let rows: Vec<(String, String, String)> = conn .query(format!( "SELECT s.digest_text, s.digest, s.schemaname @@ -34,30 +21,10 @@ pub fn find_queries_to_cache( AND digest_text NOT LIKE '%?=?%' AND s.sum_rows_sent > 0 AND q.rule_id IS NULL - ORDER BY s.sum_rows_sent DESC - LIMIT {}", - config.source_hostgroup, - config.readyset_user, - (config.number_of_queries * 2) - )) - .expect("Failed to find queries to cache"); - rows -} - -/// This function is used to check the current list of queries routed to Readyset. -/// -/// # Arguments -/// * `conn` - A mutable reference to a pooled connection to ProxySQL. -/// -/// # Returns -/// A vector of tuples containing the digest_text, digest, and schemaname of the queries that are currently routed to ReadySet. -pub fn find_queries_routed_to_readyset(conn: &mut PooledConn) -> Vec { - let rows: Vec = conn - .query(format!( - "SELECT digest FROM mysql_query_rules WHERE comment LIKE '{}%' OR comment LIKE '{}%'", - MIRROR_QUERY_TOKEN, DESTINATION_QUERY_TOKEN + ORDER BY s.sum_rows_sent DESC", + config.source_hostgroup, config.readyset_user )) - .expect("Failed to find queries routed to ReadySet"); + .expect("Failed to query proxysql_conn"); rows } @@ -67,65 +34,40 @@ pub fn replace_placeholders(query: &str) -> String { query.replace("?-?-?", "?") } -pub fn check_readyset_query_support( - conn: &mut PooledConn, - digest_text: &String, - schema: &String, -) -> Result { - conn.query_drop(format!("USE {}", schema)) - .expect("Failed to use schema"); - let row: Option<(String, String, String)> = - conn.query_first(format!("EXPLAIN CREATE CACHE FROM {}", digest_text))?; - match row { - Some((_, _, value)) => Ok(value == "yes" || value == "cached"), - None => Ok(false), - } -} - -pub fn cache_query( - conn: &mut PooledConn, - digest_text: &String, - digest: &String, -) -> Result { - conn.query_drop(format!("CREATE CACHE d_{} FROM {}", digest, digest_text)) - .expect("Failed to create readyset cache"); - Ok(true) -} - pub fn add_query_rule( - conn: &mut PooledConn, + conn: &mut Conn, digest: &String, config: &Config, ) -> Result { let datetime_now: DateTime = Local::now(); let date_formatted = datetime_now.format("%Y-%m-%d %H:%M:%S"); if config.warmup_time.is_some() { - conn.query_drop(format!("INSERT INTO mysql_query_rules (username, mirror_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, '{}: {}')", config.readyset_user, config.readyset_hostgroup, digest, MIRROR_QUERY_TOKEN, date_formatted)).expect("Failed to insert into mysql_query_rules"); + conn.query_drop(format!("INSERT INTO mysql_query_rules (username, mirror_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, 'Mirror by readyset scheduler at: {}')", config.readyset_user, config.readyset_hostgroup, digest, date_formatted)).expect("Failed to insert into mysql_query_rules"); messages::print_info("Inserted warm-up rule"); } else { - conn.query_drop(format!("INSERT INTO mysql_query_rules (username, destination_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, '{}: {}')", config.readyset_user, config.readyset_hostgroup, digest, DESTINATION_QUERY_TOKEN, date_formatted)).expect("Failed to insert into mysql_query_rules"); + conn.query_drop(format!("INSERT INTO mysql_query_rules (username, destination_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, 'Added by readyset scheduler at: {}')", config.readyset_user, config.readyset_hostgroup, digest, date_formatted)).expect("Failed to insert into mysql_query_rules"); messages::print_info("Inserted destination rule"); } Ok(true) } -pub fn load_query_rules(conn: &mut PooledConn) -> Result { +pub fn load_query_rules(conn: &mut Conn) -> Result { conn.query_drop("LOAD MYSQL QUERY RULES TO RUNTIME") .expect("Failed to load query rules"); Ok(true) } -pub fn save_query_rules(conn: &mut PooledConn) -> Result { +pub fn save_query_rules(conn: &mut Conn) -> Result { conn.query_drop("SAVE MYSQL QUERY RULES TO DISK") .expect("Failed to load query rules"); Ok(true) } -pub fn adjust_mirror_rules(conn: &mut PooledConn, config: &Config) -> Result { +pub fn adjust_mirror_rules(conn: &mut Conn, config: &Config) -> Result { let mut updated_rules = false; let datetime_now: DateTime = Local::now(); let tz = datetime_now.format("%z").to_string(); let date_formatted = datetime_now.format("%Y-%m-%d %H:%M:%S"); - let rows: Vec<(u16, String)> = conn.query(format!("SELECT rule_id, comment FROM mysql_query_rules WHERE comment LIKE '{}: ____-__-__ __:__:__';", MIRROR_QUERY_TOKEN)).expect("Failed to select mirror rules"); + let rows: Vec<(u16, String)> = conn.query("SELECT rule_id, comment FROM mysql_query_rules WHERE comment LIKE 'Mirror by readyset scheduler at: ____-__-__ __:__:__';").expect("Failed to select mirror rules"); for (rule_id, comment) in rows { let datetime_mirror_str = comment .split("Mirror by readyset scheduler at:") @@ -157,22 +99,21 @@ pub fn adjust_mirror_rules(conn: &mut PooledConn, config: &Config) -> Result, ) { + if hosts.is_empty() { + return; + } let mut queries_added_or_change = adjust_mirror_rules(proxysql_conn, config).unwrap(); - let mut current_queries_digest: Vec = find_queries_routed_to_readyset(proxysql_conn); let rows: Vec<(String, String, String)> = find_queries_to_cache(proxysql_conn, config); for (digest_text, digest, schema) in rows { - if current_queries_digest.len() > config.number_of_queries as usize { - break; - } let digest_text = replace_placeholders(&digest_text); messages::print_info(format!("Going to test query support for {}", digest_text).as_str()); - let supported = check_readyset_query_support(readyset_conn, &digest_text, &schema); + let supported = &hosts[0].check_query_support(&digest_text, &schema); match supported { Ok(true) => { messages::print_info( @@ -181,10 +122,11 @@ pub fn query_discovery( .as_str(), ); queries_added_or_change = true; - cache_query(readyset_conn, &digest_text, &digest) - .expect("Failed to create readyset cache"); + hosts.iter_mut().for_each(|host| { + host.cache_query(&digest_text) + .expect("Failed to create readyset cache"); + }); add_query_rule(proxysql_conn, &digest, config).expect("Failed to add query rule"); - current_queries_digest.push(digest); } Ok(false) => { messages::print_info("Query is not supported"); diff --git a/src/server.rs b/src/server.rs deleted file mode 100644 index 3ea9e13..0000000 --- a/src/server.rs +++ /dev/null @@ -1,71 +0,0 @@ -use core::fmt; - -use crate::{config::Config, messages}; -use mysql::{prelude::Queryable, PooledConn}; - -#[allow(dead_code)] -pub enum ServerStatus { - //backend server is fully operational - Online, - //backend sever is temporarily taken out of use because of either too many connection errors in a time that was too short, or the replication lag exceeded the allowed threshold - Shunned, - //when a server is put into OFFLINE_SOFT mode, no new connections are created toward that server, while the existing connections are kept until they are returned to the connection pool or destructed. In other words, connections are kept in use until multiplexing is enabled again, for example when a transaction is completed. This makes it possible to gracefully detach a backend as long as multiplexing is efficient - OfflineSoft, - //when a server is put into OFFLINE_HARD mode, no new connections are created toward that server and the existing **free **connections are ** immediately dropped**, while backend connections currently associated with a client session are dropped as soon as the client tries to use them. This is equivalent to deleting the server from a hostgroup. Internally, setting a server in OFFLINE_HARD status is equivalent to deleting the server - OfflineHard, -} - -impl fmt::Display for ServerStatus { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ServerStatus::Online => write!(f, "ONLINE"), - ServerStatus::Shunned => write!(f, "SHUNNED"), - ServerStatus::OfflineSoft => write!(f, "OFFLINE_SOFT"), - ServerStatus::OfflineHard => write!(f, "OFFLINE_HARD"), - } - } -} - -pub fn check_readyset_is_ready(rs_conn: &mut PooledConn) -> Result { - let rows: Vec<(String, String)> = rs_conn.query("SHOW READYSET STATUS").unwrap_or(vec![]); - for (field, value) in rows { - if field == "Snapshot Status" { - return Ok(value == "Completed"); - } - } - Ok(false) -} - -pub fn change_server_status( - ps_conn: &mut PooledConn, - config: &Config, - server_status: ServerStatus, -) -> Result { - let where_clause = format!( - "WHERE hostgroup_id = {} AND hostname = '{}' AND port = {}", - config.readyset_hostgroup, config.readyset_host, config.readyset_port - ); - let select_query = format!("SELECT status FROM runtime_mysql_servers {}", where_clause); - let status: Option = ps_conn.query_first(select_query)?; - if status.as_ref().unwrap() != &server_status.to_string() { - messages::print_info( - format!( - "Server HG: {}, Host: {}, Port: {} is currently {}. Changing to {}", - config.readyset_hostgroup, - config.readyset_host, - config.readyset_port, - status.unwrap(), - server_status - ) - .as_str(), - ); - ps_conn.query_drop(format!( - "UPDATE mysql_servers SET status = '{}' {}", - server_status, where_clause - ))?; - ps_conn.query_drop("LOAD MYSQL SERVERS TO RUNTIME")?; - ps_conn.query_drop("SAVE MYSQL SERVERS TO DISK")?; - } - - Ok(true) -}