diff --git a/build/docker-compose.yml b/build/docker-compose.yml index 77bf090..a7c89c8 100644 --- a/build/docker-compose.yml +++ b/build/docker-compose.yml @@ -29,6 +29,7 @@ services: - "6034:6034" environment: - UPSTREAM_DB_URL=mysql://root:noria@172.17.0.1:3306/noria + - LISTEN_ADDRESS=0.0.0.0:3307 depends_on: mysql-master: condition: service_healthy diff --git a/build/test.cnf b/build/test.cnf new file mode 100644 index 0000000..cc3b7d0 --- /dev/null +++ b/build/test.cnf @@ -0,0 +1,12 @@ +proxysql_user = 'radmin' +proxysql_password = 'radmin' +proxysql_host = '127.0.0.1' +proxysql_port = 6032 +readyset_user = 'root' +readyset_password = 'noria' +source_hostgroup = 1 +readyset_hostgroup = 2 +warmup_time = 60 +lock_file = '/tmp/readyset_scheduler.lock' +operation_mode="All" +number_of_queries=10 diff --git a/readyset_proxysql_scheduler.cnf b/readyset_proxysql_scheduler.cnf index a73df77..8599758 100644 --- a/readyset_proxysql_scheduler.cnf +++ b/readyset_proxysql_scheduler.cnf @@ -4,8 +4,6 @@ proxysql_host = '127.0.0.1' proxysql_port = 6032 readyset_user = 'root' readyset_password = 'root' -readyset_host = '127.0.0.1' -readyset_port = 3307 source_hostgroup = 11 readyset_hostgroup = 99 warmup_time = 60 diff --git a/src/config.rs b/src/config.rs index 4e9ce06..90ae6f1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -41,8 +41,6 @@ pub struct Config { pub proxysql_port: u16, pub readyset_user: String, pub readyset_password: String, - pub readyset_host: String, - pub readyset_port: u16, pub source_hostgroup: u16, pub readyset_hostgroup: u16, pub warmup_time: Option, diff --git a/src/health_check.rs b/src/health_check.rs index 37b2930..bde914d 100644 --- a/src/health_check.rs +++ b/src/health_check.rs @@ -1,27 +1,22 @@ use crate::{ - config, messages, - server::{self, ServerStatus}, + config, + hosts::{Host, HostStatus}, + messages, }; -pub fn health_check( - proxysql_conn: &mut mysql::PooledConn, - config: &config::Config, - readyset_conn: &mut mysql::PooledConn, -) { - match server::check_readyset_is_ready(readyset_conn) { +pub fn health_check(proxysql_conn: &mut mysql::Conn, config: &config::Config, host: &mut Host) { + match host.check_readyset_is_ready() { Ok(ready) => { if ready { - let _ = server::change_server_status(proxysql_conn, config, ServerStatus::Online); + let _ = host.change_status(proxysql_conn, config, HostStatus::Online); } else { messages::print_info("Readyset is still running Snapshot."); - let _ = server::change_server_status(proxysql_conn, config, ServerStatus::Shunned); - std::process::exit(0); + let _ = host.change_status(proxysql_conn, config, HostStatus::Shunned); } } Err(e) => { messages::print_error(format!("Cannot check Readyset status: {}.", e).as_str()); - let _ = server::change_server_status(proxysql_conn, config, ServerStatus::Shunned); - std::process::exit(1); + let _ = host.change_status(proxysql_conn, config, HostStatus::Shunned); } }; } diff --git a/src/hosts.rs b/src/hosts.rs new file mode 100644 index 0000000..fea8d5d --- /dev/null +++ b/src/hosts.rs @@ -0,0 +1,323 @@ +use crate::{config::Config, messages}; +use core::fmt; +use mysql::{prelude::Queryable, Conn, OptsBuilder}; + +#[allow(dead_code)] +/// Defines the possible status of a host +#[derive(PartialEq, Clone, Copy)] +pub enum HostStatus { + ///backend server is fully operational + Online, + //backend sever is temporarily taken out of use because of either too many connection errors in a time that was too short, or the replication lag exceeded the allowed threshold + Shunned, + //when a server is put into OFFLINE_SOFT mode, no new connections are created toward that server, while the existing connections are kept until they are returned to the connection pool or destructed. In other words, connections are kept in use until multiplexing is enabled again, for example when a transaction is completed. This makes it possible to gracefully detach a backend as long as multiplexing is efficient + OfflineSoft, + //when a server is put into OFFLINE_HARD mode, no new connections are created toward that server and the existing **free **connections are ** immediately dropped**, while backend connections currently associated with a client session are dropped as soon as the client tries to use them. This is equivalent to deleting the server from a hostgroup. Internally, setting a server in OFFLINE_HARD status is equivalent to deleting the server + OfflineHard, +} + +impl fmt::Display for HostStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + HostStatus::Online => write!(f, "ONLINE"), + HostStatus::Shunned => write!(f, "SHUNNED"), + HostStatus::OfflineSoft => write!(f, "OFFLINE_SOFT"), + HostStatus::OfflineHard => write!(f, "OFFLINE_HARD"), + } + } +} + +impl From for HostStatus { + fn from(s: String) -> Self { + match s.to_uppercase().as_str() { + "ONLINE" => HostStatus::Online, + "SHUNNED" => HostStatus::Shunned, + "OFFLINE_SOFT" => HostStatus::OfflineSoft, + "OFFLINE_HARD" => HostStatus::OfflineHard, + _ => HostStatus::Online, + } + } +} + +/// Represents a Readyset host +pub struct Host { + hostname: String, + port: u16, + status: HostStatus, + conn: Option, +} + +impl Host { + /// Creates a new `Host` instance with the given hostname and port. + /// The connection to the host is established during the creation of the instance. + /// If the connection fails, the `conn` field will be `None`. + /// If the connection is successful, the `conn` field will contain the connection. + /// + /// # Arguments + /// + /// * `hostname` - The hostname of the host. + /// * `port` - The port number of the host. + /// + /// # Returns + /// + /// A new `Host` instance. + pub fn new(hostname: String, port: u16, status: String, config: &Config) -> Host { + let conn = match Conn::new( + OptsBuilder::new() + .ip_or_hostname(Some(hostname.clone())) + .tcp_port(port) + .user(Some(config.readyset_user.clone())) + .pass(Some(config.readyset_password.clone())) + .prefer_socket(false), + ) { + Ok(conn) => conn, + Err(err) => { + eprintln!("Failed to establish connection: {}", err); + return Host { + hostname, + port, + status: HostStatus::from(status), + conn: None, + }; + } + }; + + Host { + hostname, + port, + status: HostStatus::from(status), + conn: Some(conn), + } + } + + /// Gets the hostname of the host. + /// + /// # Returns + /// + /// The hostname of the host. + pub fn get_hostname(&self) -> &String { + &self.hostname + } + + /// Gets the port of the host. + /// + /// # Returns + /// + /// The port of the host. + pub fn get_port(&self) -> u16 { + self.port + } + + /// Gets the status of the host. + /// + /// # Returns + /// + /// The status of the host. + fn get_status(&self) -> HostStatus { + self.status + } + + /// Checks if the host is online. + /// + /// # Returns + /// + /// true if the host is online, false otherwise. + pub fn is_online(&self) -> bool { + self.status == HostStatus::Online + } + + /// Checks if the Readyset host is ready to serve traffic. + /// This is done by querying the SHOW READYSET STATUS command. + /// + /// # Returns + /// + /// true if the host is ready, false otherwise. + pub fn check_readyset_is_ready(&mut self) -> Result { + match &mut self.conn { + Some(conn) => { + let rows: Vec<(String, String)> = + conn.query("SHOW READYSET STATUS").unwrap_or(vec![]); + for (field, value) in rows { + if field == "Snapshot Status" { + return Ok(value == "Completed"); + } + } + Ok(false) + } + None => Err(mysql::Error::IoError(std::io::Error::new( + std::io::ErrorKind::Other, + "Connection to Readyset host is not established", + ))), + } + } + + /// Checks if the host supports the given query. + /// This is done by querying the EXPLAIN CREATE CACHE FROM command. + /// + /// # Arguments + /// + /// * `digest_text` - The digest text of the query. + /// * `schema` - The schema of the query. + /// + /// # Returns + /// + /// true if the host supports the query, false otherwise. + pub fn check_query_support( + &mut self, + digest_text: &String, + schema: &String, + ) -> Result { + match &mut self.conn { + Some(conn) => { + conn.query_drop(format!("USE {}", schema)) + .expect("Failed to use schema"); + let row: Option<(String, String, String)> = + conn.query_first(format!("EXPLAIN CREATE CACHE FROM {}", digest_text))?; + match row { + Some((_, _, value)) => Ok(value == "yes" || value == "cached"), + None => Ok(false), + } + } + None => Ok(false), + } + } + + /// Caches the given query on the host. + /// This is done by executing the CREATE CACHE FROM command. + /// + /// # Arguments + /// + /// * `digest_text` - The digest text of the query. + /// + /// # Returns + /// + /// true if the query was cached successfully, false otherwise. + pub fn cache_query( + &mut self, + digest_text: &String, + digest: &String, + ) -> Result { + match &mut self.conn { + None => return Ok(false), + Some(conn) => { + conn.query_drop(format!("CREATE CACHE d_{} FROM {}", digest, digest_text)) + .expect("Failed to create readyset cache"); + } + } + Ok(true) + } + + /// Changes the status of the host in the ProxySQL mysql_servers table. + /// The status is set to the given `status`. + pub fn change_status( + &mut self, + ps_conn: &mut Conn, + config: &Config, + status: HostStatus, + ) -> Result { + let where_clause = format!( + "WHERE hostgroup_id = {} AND hostname = '{}' AND port = {}", + config.readyset_hostgroup, + self.get_hostname(), + self.get_port() + ); + if self.status != status { + messages::print_info( + format!( + "Server HG: {}, Host: {}, Port: {} is currently {}. Changing to {}", + config.readyset_hostgroup, + self.get_hostname(), + self.get_port(), + self.get_status(), + status + ) + .as_str(), + ); + self.status = status; + ps_conn.query_drop(format!( + "UPDATE mysql_servers SET status = '{}' {}", + self.get_status(), + where_clause + ))?; + ps_conn.query_drop("LOAD MYSQL SERVERS TO RUNTIME")?; + ps_conn.query_drop("SAVE MYSQL SERVERS TO DISK")?; + } + + Ok(true) + } +} + +/// Represents a list of Readyset hosts +pub struct Hosts { + hosts: Vec, +} + +impl From> for Hosts { + fn from(hosts: Vec) -> Self { + Hosts { hosts } + } +} + +impl Hosts { + /// Fetches the hosts from the ProxySQL mysql_servers table. + /// + /// # Arguments + /// + /// * `proxysql_conn` - The connection to the ProxySQL instance. + /// * `config` - The configuration object. + /// + /// # Returns + /// + /// A vector of `Host` instances. + pub fn new<'a>(proxysql_conn: &'a mut Conn, config: &'a Config) -> Self { + let query = format!( + "SELECT hostname, port, status, comment FROM mysql_servers WHERE hostgroup_id = {}", + config.readyset_hostgroup + ); + let results: Vec<(String, u16, String, String)> = proxysql_conn.query(query).unwrap(); + results + .into_iter() + .filter_map(|(hostname, port, status, comment)| { + if comment.to_lowercase().contains("readyset") { + Some(Host::new(hostname, port, status, config)) + } else { + None + } + }) + .collect::>() + .into() + } + + /// Gets a mutable iterator over the hosts. + /// + /// # Returns + /// + /// A mutable iterator over the hosts. + pub fn iter_mut(&mut self) -> core::slice::IterMut { + self.hosts.iter_mut() + } + + /// Mutate self by retaining only the hosts that are online. + pub fn retain_online(&mut self) { + self.hosts.retain(|host| host.is_online()); + } + + /// Checks if the hosts list is empty. + /// + /// # Returns + /// + /// true if the hosts list is empty, false otherwise. + pub fn is_empty(&self) -> bool { + self.hosts.is_empty() + } + + /// Gets a mutable reference for the first host in the list. + /// + /// # Returns + /// + /// A reference to the first host in the list. + /// If the list is empty, the function returns None. + pub fn first_mut(&mut self) -> Option<&mut Host> { + self.hosts.first_mut() + } +} diff --git a/src/main.rs b/src/main.rs index 16e8e59..f428101 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,17 +1,16 @@ mod config; mod health_check; +mod hosts; mod messages; mod queries; -mod server; use clap::Parser; use config::read_config_file; -use mysql::OptsBuilder; -use mysql::{Pool, PoolConstraints, PoolOpts}; +use hosts::Hosts; +use mysql::{Conn, OptsBuilder}; use file_guard::Lock; use queries::query_discovery; -use server::ServerStatus; use std::fs::OpenOptions; /// Readyset ProxySQL Scheduler @@ -63,61 +62,37 @@ fn main() { std::process::exit(1); } }; - - let proxysql_pool = Pool::new( + let mut proxysql_conn = Conn::new( OptsBuilder::new() .ip_or_hostname(Some(config.proxysql_host.as_str())) .tcp_port(config.proxysql_port) .user(Some(config.proxysql_user.as_str())) .pass(Some(config.proxysql_password.as_str())) - .prefer_socket(false) - .pool_opts( - PoolOpts::default() - .with_reset_connection(false) - .with_constraints(PoolConstraints::new(1, 1).unwrap()), - ), + .prefer_socket(false), ) .unwrap(); - let mut proxysql_conn = proxysql_pool.get_conn().unwrap(); - - let readyset_pool = match Pool::new( - OptsBuilder::new() - .ip_or_hostname(Some(config.readyset_host.as_str())) - .tcp_port(config.readyset_port) - .user(Some(config.readyset_user.as_str())) - .pass(Some(config.readyset_password.as_str())) - .prefer_socket(false) - .pool_opts( - PoolOpts::default() - .with_reset_connection(false) - .with_constraints(PoolConstraints::new(1, 1).unwrap()), - ), - ) { - Ok(conn) => conn, - Err(e) => { - messages::print_error(format!("Cannot connect to Readyset: {}.", e).as_str()); - let _ = - server::change_server_status(&mut proxysql_conn, &config, ServerStatus::Shunned); - std::process::exit(1); - } - }; - let mut readyset_conn = readyset_pool.get_conn().unwrap(); let running_mode = match config.operation_mode { Some(mode) => mode, None => config::OperationMode::All, }; + let mut hosts = Hosts::new(&mut proxysql_conn, &config); + if running_mode == config::OperationMode::HealthCheck || running_mode == config::OperationMode::All { - health_check::health_check(&mut proxysql_conn, &config, &mut readyset_conn) + hosts.iter_mut().for_each(|host| { + health_check::health_check(&mut proxysql_conn, &config, host); + }); } + // retain only healthy hosts + hosts.retain_online(); if running_mode == config::OperationMode::QueryDiscovery || running_mode == config::OperationMode::All { - query_discovery(&mut proxysql_conn, &config, &mut readyset_conn); + query_discovery(&mut proxysql_conn, &config, &mut hosts); } messages::print_info("Finished readyset_scheduler"); diff --git a/src/queries.rs b/src/queries.rs index e8fbe4a..8aca89b 100644 --- a/src/queries.rs +++ b/src/queries.rs @@ -1,8 +1,9 @@ use chrono::{DateTime, Local}; -use mysql::{prelude::Queryable, PooledConn}; +use mysql::{prelude::Queryable, Conn}; use crate::{ config::{self, Config}, + hosts::Hosts, messages, }; @@ -12,15 +13,12 @@ const DESTINATION_QUERY_TOKEN: &str = "Added by readyset scheduler at"; /// This function is used to find queries that are not cached in ReadySet and are not in the mysql_query_rules table. /// /// # Arguments -/// * `conn` - A mutable reference to a pooled connection to ProxySQL. +/// * `conn` - A reference to a connection to ProxySQL. /// * `config` - A reference to the configuration struct. /// /// # Returns /// A vector of tuples containing the digest_text, digest, and schema name of the queries that are not cached in ReadySet and are not in the mysql_query_rules table. -pub fn find_queries_to_cache( - conn: &mut PooledConn, - config: &Config, -) -> Vec<(String, String, String)> { +pub fn find_queries_to_cache(conn: &mut Conn, config: &Config) -> Vec<(String, String, String)> { let rows: Vec<(String, String, String)> = conn .query(format!( "SELECT s.digest_text, s.digest, s.schemaname @@ -47,11 +45,11 @@ pub fn find_queries_to_cache( /// This function is used to check the current list of queries routed to Readyset. /// /// # Arguments -/// * `conn` - A mutable reference to a pooled connection to ProxySQL. +/// * `conn` - A reference to a connection to ProxySQL. /// /// # Returns /// A vector of tuples containing the digest_text, digest, and schemaname of the queries that are currently routed to ReadySet. -pub fn find_queries_routed_to_readyset(conn: &mut PooledConn) -> Vec { +pub fn find_queries_routed_to_readyset(conn: &mut Conn) -> Vec { let rows: Vec = conn .query(format!( "SELECT digest FROM mysql_query_rules WHERE comment LIKE '{}%' OR comment LIKE '{}%'", @@ -67,33 +65,8 @@ pub fn replace_placeholders(query: &str) -> String { query.replace("?-?-?", "?") } -pub fn check_readyset_query_support( - conn: &mut PooledConn, - digest_text: &String, - schema: &String, -) -> Result { - conn.query_drop(format!("USE {}", schema)) - .expect("Failed to use schema"); - let row: Option<(String, String, String)> = - conn.query_first(format!("EXPLAIN CREATE CACHE FROM {}", digest_text))?; - match row { - Some((_, _, value)) => Ok(value == "yes" || value == "cached"), - None => Ok(false), - } -} - -pub fn cache_query( - conn: &mut PooledConn, - digest_text: &String, - digest: &String, -) -> Result { - conn.query_drop(format!("CREATE CACHE d_{} FROM {}", digest, digest_text)) - .expect("Failed to create readyset cache"); - Ok(true) -} - pub fn add_query_rule( - conn: &mut PooledConn, + conn: &mut Conn, digest: &String, config: &Config, ) -> Result { @@ -109,18 +82,18 @@ pub fn add_query_rule( Ok(true) } -pub fn load_query_rules(conn: &mut PooledConn) -> Result { +pub fn load_query_rules(conn: &mut Conn) -> Result { conn.query_drop("LOAD MYSQL QUERY RULES TO RUNTIME") .expect("Failed to load query rules"); Ok(true) } -pub fn save_query_rules(conn: &mut PooledConn) -> Result { +pub fn save_query_rules(conn: &mut Conn) -> Result { conn.query_drop("SAVE MYSQL QUERY RULES TO DISK") .expect("Failed to load query rules"); Ok(true) } -pub fn adjust_mirror_rules(conn: &mut PooledConn, config: &Config) -> Result { +pub fn adjust_mirror_rules(conn: &mut Conn, config: &Config) -> Result { let mut updated_rules = false; let datetime_now: DateTime = Local::now(); let tz = datetime_now.format("%z").to_string(); @@ -156,11 +129,11 @@ pub fn adjust_mirror_rules(conn: &mut PooledConn, config: &Config) -> Result = find_queries_routed_to_readyset(proxysql_conn); @@ -172,7 +145,10 @@ pub fn query_discovery( } let digest_text = replace_placeholders(&digest_text); messages::print_info(format!("Going to test query support for {}", digest_text).as_str()); - let supported = check_readyset_query_support(readyset_conn, &digest_text, &schema); + let supported = hosts + .first_mut() + .unwrap() + .check_query_support(&digest_text, &schema); // Safe to unwrap because we checked if hosts is empty match supported { Ok(true) => { messages::print_info( @@ -181,8 +157,10 @@ pub fn query_discovery( .as_str(), ); queries_added_or_change = true; - cache_query(readyset_conn, &digest_text, &digest) - .expect("Failed to create readyset cache"); + hosts.iter_mut().for_each(|host| { + host.cache_query(&digest_text, &digest) + .expect("Failed to create readyset cache"); + }); add_query_rule(proxysql_conn, &digest, config).expect("Failed to add query rule"); current_queries_digest.push(digest); } diff --git a/src/server.rs b/src/server.rs deleted file mode 100644 index 3ea9e13..0000000 --- a/src/server.rs +++ /dev/null @@ -1,71 +0,0 @@ -use core::fmt; - -use crate::{config::Config, messages}; -use mysql::{prelude::Queryable, PooledConn}; - -#[allow(dead_code)] -pub enum ServerStatus { - //backend server is fully operational - Online, - //backend sever is temporarily taken out of use because of either too many connection errors in a time that was too short, or the replication lag exceeded the allowed threshold - Shunned, - //when a server is put into OFFLINE_SOFT mode, no new connections are created toward that server, while the existing connections are kept until they are returned to the connection pool or destructed. In other words, connections are kept in use until multiplexing is enabled again, for example when a transaction is completed. This makes it possible to gracefully detach a backend as long as multiplexing is efficient - OfflineSoft, - //when a server is put into OFFLINE_HARD mode, no new connections are created toward that server and the existing **free **connections are ** immediately dropped**, while backend connections currently associated with a client session are dropped as soon as the client tries to use them. This is equivalent to deleting the server from a hostgroup. Internally, setting a server in OFFLINE_HARD status is equivalent to deleting the server - OfflineHard, -} - -impl fmt::Display for ServerStatus { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ServerStatus::Online => write!(f, "ONLINE"), - ServerStatus::Shunned => write!(f, "SHUNNED"), - ServerStatus::OfflineSoft => write!(f, "OFFLINE_SOFT"), - ServerStatus::OfflineHard => write!(f, "OFFLINE_HARD"), - } - } -} - -pub fn check_readyset_is_ready(rs_conn: &mut PooledConn) -> Result { - let rows: Vec<(String, String)> = rs_conn.query("SHOW READYSET STATUS").unwrap_or(vec![]); - for (field, value) in rows { - if field == "Snapshot Status" { - return Ok(value == "Completed"); - } - } - Ok(false) -} - -pub fn change_server_status( - ps_conn: &mut PooledConn, - config: &Config, - server_status: ServerStatus, -) -> Result { - let where_clause = format!( - "WHERE hostgroup_id = {} AND hostname = '{}' AND port = {}", - config.readyset_hostgroup, config.readyset_host, config.readyset_port - ); - let select_query = format!("SELECT status FROM runtime_mysql_servers {}", where_clause); - let status: Option = ps_conn.query_first(select_query)?; - if status.as_ref().unwrap() != &server_status.to_string() { - messages::print_info( - format!( - "Server HG: {}, Host: {}, Port: {} is currently {}. Changing to {}", - config.readyset_hostgroup, - config.readyset_host, - config.readyset_port, - status.unwrap(), - server_status - ) - .as_str(), - ); - ps_conn.query_drop(format!( - "UPDATE mysql_servers SET status = '{}' {}", - server_status, where_clause - ))?; - ps_conn.query_drop("LOAD MYSQL SERVERS TO RUNTIME")?; - ps_conn.query_drop("SAVE MYSQL SERVERS TO DISK")?; - } - - Ok(true) -}