Skip to content

Commit

Permalink
Add host list capability
Browse files Browse the repository at this point in the history
This commit adds the ability to read a list of hosts from mysql_servers
table instead of a single host from config file.

This merges the old server.rs into the hosts file.
This also remove the Pooled connection to a single connection.

Fixes: #5
  • Loading branch information
altmannmarcelo committed Sep 3, 2024
1 parent 7226fbb commit 474adff
Show file tree
Hide file tree
Showing 9 changed files with 331 additions and 208 deletions.
1 change: 1 addition & 0 deletions build/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ services:
- "6034:6034"
environment:
- UPSTREAM_DB_URL=mysql://root:[email protected]:3306/noria
- LISTEN_ADDRESS=0.0.0.0:3307
depends_on:
mysql-master:
condition: service_healthy
Expand Down
12 changes: 12 additions & 0 deletions build/test.cnf
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
proxysql_user = 'radmin'
proxysql_password = 'radmin'
proxysql_host = '127.0.0.1'
proxysql_port = 6032
readyset_user = 'root'
readyset_password = 'noria'
source_hostgroup = 1
readyset_hostgroup = 2
warmup_time = 60
lock_file = '/tmp/readyset_scheduler.lock'
operation_mode="All"
number_of_queries=10
4 changes: 1 addition & 3 deletions readyset_proxysql_scheduler.cnf
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
proxysql_user = 'admin'
proxysql_user = 'radmin'
proxysql_password = 'admin'
proxysql_host = '127.0.0.1'
proxysql_port = 6032
readyset_user = 'root'
readyset_password = 'root'
readyset_host = '127.0.0.1'
readyset_port = 3307
source_hostgroup = 11
readyset_hostgroup = 99
warmup_time = 60
Expand Down
2 changes: 0 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ pub struct Config {
pub proxysql_port: u16,
pub readyset_user: String,
pub readyset_password: String,
pub readyset_host: String,
pub readyset_port: u16,
pub source_hostgroup: u16,
pub readyset_hostgroup: u16,
pub warmup_time: Option<u16>,
Expand Down
21 changes: 8 additions & 13 deletions src/health_check.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,22 @@
use crate::{
config, messages,
server::{self, ServerStatus},
config,
hosts::{Host, HostStatus},
messages,
};

pub fn health_check(
proxysql_conn: &mut mysql::PooledConn,
config: &config::Config,
readyset_conn: &mut mysql::PooledConn,
) {
match server::check_readyset_is_ready(readyset_conn) {
pub fn health_check(proxysql_conn: &mut mysql::Conn, config: &config::Config, host: &mut Host) {
match host.check_readyset_is_ready() {
Ok(ready) => {
if ready {
let _ = server::change_server_status(proxysql_conn, config, ServerStatus::Online);
let _ = host.change_status(proxysql_conn, config, HostStatus::Online);
} else {
messages::print_info("Readyset is still running Snapshot.");
let _ = server::change_server_status(proxysql_conn, config, ServerStatus::Shunned);
std::process::exit(0);
let _ = host.change_status(proxysql_conn, config, HostStatus::Shunned);
}
}
Err(e) => {
messages::print_error(format!("Cannot check Readyset status: {}.", e).as_str());
let _ = server::change_server_status(proxysql_conn, config, ServerStatus::Shunned);
std::process::exit(1);
let _ = host.change_status(proxysql_conn, config, HostStatus::Shunned);
}
};
}
274 changes: 274 additions & 0 deletions src/hosts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
use crate::{config::Config, messages};
use core::fmt;
use mysql::{prelude::Queryable, Conn, OptsBuilder};

#[allow(dead_code)]
/// Defines the possible status of a host
#[derive(PartialEq, Clone, Copy)]
pub enum HostStatus {
///backend server is fully operational
Online,
//backend sever is temporarily taken out of use because of either too many connection errors in a time that was too short, or the replication lag exceeded the allowed threshold
Shunned,
//when a server is put into OFFLINE_SOFT mode, no new connections are created toward that server, while the existing connections are kept until they are returned to the connection pool or destructed. In other words, connections are kept in use until multiplexing is enabled again, for example when a transaction is completed. This makes it possible to gracefully detach a backend as long as multiplexing is efficient
OfflineSoft,
//when a server is put into OFFLINE_HARD mode, no new connections are created toward that server and the existing **free **connections are ** immediately dropped**, while backend connections currently associated with a client session are dropped as soon as the client tries to use them. This is equivalent to deleting the server from a hostgroup. Internally, setting a server in OFFLINE_HARD status is equivalent to deleting the server
OfflineHard,
}

impl fmt::Display for HostStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
HostStatus::Online => write!(f, "ONLINE"),
HostStatus::Shunned => write!(f, "SHUNNED"),
HostStatus::OfflineSoft => write!(f, "OFFLINE_SOFT"),
HostStatus::OfflineHard => write!(f, "OFFLINE_HARD"),
}
}
}

impl From<String> for HostStatus {
fn from(s: String) -> Self {
match s.to_uppercase().as_str() {
"ONLINE" => HostStatus::Online,
"SHUNNED" => HostStatus::Shunned,
"OFFLINE_SOFT" => HostStatus::OfflineSoft,
"OFFLINE_HARD" => HostStatus::OfflineHard,
_ => HostStatus::Online,
}
}
}

/// Represents a Readyset host
pub struct Host {
hostname: String,
port: u16,
status: HostStatus,
conn: Option<Conn>,
}

impl Host {
/// Creates a new `Host` instance with the given hostname and port.
/// The connection to the host is established during the creation of the instance.
/// If the connection fails, the `conn` field will be `None`.
/// If the connection is successful, the `conn` field will contain the connection.
///
/// # Arguments
///
/// * `hostname` - The hostname of the host.
/// * `port` - The port number of the host.
///
/// # Returns
///
/// A new `Host` instance.
pub fn new(hostname: String, port: u16, status: String, config: &Config) -> Host {
let conn = match Conn::new(
OptsBuilder::new()
.ip_or_hostname(Some(hostname.clone()))
.tcp_port(port)
.user(Some(config.readyset_user.clone()))
.pass(Some(config.readyset_password.clone()))
.prefer_socket(false),
) {
Ok(conn) => conn,
Err(err) => {
eprintln!("Failed to establish connection: {}", err);
return Host {
hostname,
port,
status: HostStatus::from(status),
conn: None,
};
}
};

Host {
hostname,
port,
status: HostStatus::from(status),
conn: Some(conn),
}
}

/// Gets the hostname of the host.
///
/// # Returns
///
/// The hostname of the host.
pub fn get_hostname(&self) -> &String {
&self.hostname
}

/// Gets the port of the host.
///
/// # Returns
///
/// The port of the host.
pub fn get_port(&self) -> u16 {
self.port
}

/// Gets the status of the host.
///
/// # Returns
///
/// The status of the host.
fn get_status(&self) -> HostStatus {
self.status
}

/// Checks if the host is online.
///
/// # Returns
///
/// true if the host is online, false otherwise.
pub fn is_online(&self) -> bool {
self.status == HostStatus::Online
}

/// Checks if the Readyset host is ready to serve traffic.
/// This is done by querying the SHOW READYSET STATUS command.
///
/// # Returns
///
/// true if the host is ready, false otherwise.
pub fn check_readyset_is_ready(&mut self) -> Result<bool, mysql::Error> {
match &mut self.conn {
Some(conn) => {
let rows: Vec<(String, String)> =
conn.query("SHOW READYSET STATUS").unwrap_or(vec![]);
for (field, value) in rows {
if field == "Snapshot Status" {
return Ok(value == "Completed");
}
}
Ok(false)
}
None => Err(mysql::Error::from(mysql::Error::IoError(
std::io::Error::new(
std::io::ErrorKind::Other,
"Connection to host is not established",
),
))),
}
}

/// Checks if the host supports the given query.
/// This is done by querying the EXPLAIN CREATE CACHE FROM command.
///
/// # Arguments
///
/// * `digest_text` - The digest text of the query.
/// * `schema` - The schema of the query.
///
/// # Returns
///
/// true if the host supports the query, false otherwise.
pub fn check_query_support(
&mut self,
digest_text: &String,
schema: &String,
) -> Result<bool, mysql::Error> {
match &mut self.conn {
Some(conn) => {
conn.query_drop(format!("USE {}", schema))
.expect("Failed to use schema");
let row: Option<(String, String, String)> =
conn.query_first(format!("EXPLAIN CREATE CACHE FROM {}", digest_text))?;
match row {
Some((_, _, value)) => Ok(value == "yes" || value == "cached"),
None => Ok(false),
}
}
None => Ok(false),
}
}

/// Caches the given query on the host.
/// This is done by executing the CREATE CACHE FROM command.
///
/// # Arguments
///
/// * `digest_text` - The digest text of the query.
///
/// # Returns
///
/// true if the query was cached successfully, false otherwise.
pub fn cache_query(&mut self, digest_text: &String) -> Result<bool, mysql::Error> {
match &mut self.conn {
None => return Ok(false),
Some(conn) => {
conn.query_drop(format!("CREATE CACHE FROM {}", digest_text))
.expect("Failed to create readyset cache");
}
}
Ok(true)
}

/// Changes the status of the host in the ProxySQL mysql_servers table.
/// The status is set to the given `status`.
pub fn change_status(
&mut self,
ps_conn: &mut Conn,
config: &Config,
status: HostStatus,
) -> Result<bool, mysql::Error> {
let where_clause = format!(
"WHERE hostgroup_id = {} AND hostname = '{}' AND port = {}",
config.readyset_hostgroup,
self.get_hostname(),
self.get_port()
);
if self.status != status {
messages::print_info(
format!(
"Server HG: {}, Host: {}, Port: {} is currently {}. Changing to {}",
config.readyset_hostgroup,
self.get_hostname(),
self.get_port(),
self.get_status(),
status
)
.as_str(),
);
self.status = status;
ps_conn.query_drop(format!(
"UPDATE mysql_servers SET status = '{}' {}",
self.get_status(),
where_clause
))?;
ps_conn.query_drop("LOAD MYSQL SERVERS TO RUNTIME")?;
ps_conn.query_drop("SAVE MYSQL SERVERS TO DISK")?;
}

Ok(true)
}
}

/// Fetches the hosts from the ProxySQL mysql_servers table.
///
/// # Arguments
///
/// * `proxysql_conn` - The connection to the ProxySQL instance.
/// * `config` - The configuration object.
///
/// # Returns
///
/// A vector of `Host` instances.
pub fn get_hosts<'a>(proxysql_conn: &'a mut Conn, config: &'a Config) -> Vec<Host> {
let query = format!(
"SELECT hostname, port, status, comment FROM mysql_servers WHERE hostgroup_id = {}",
config.readyset_hostgroup
);
let results: Vec<(String, u16, String, String)> = proxysql_conn.query(query).unwrap();
results
.into_iter()
.filter_map(|(hostname, port, status, comment)| {
if comment.to_lowercase().contains("readyset") {
Some(Host::new(hostname, port, status, config))
} else {
None
}
})
.collect::<Vec<Host>>()
}
Loading

0 comments on commit 474adff

Please sign in to comment.