Skip to content

Commit

Permalink
Merge pull request #13 from readysettech/issue-4
Browse files Browse the repository at this point in the history
Limit number of caches
  • Loading branch information
altmannmarcelo authored Aug 14, 2024
2 parents 199aebd + c7e0c44 commit 57ff6b0
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 7 deletions.
3 changes: 2 additions & 1 deletion readyset_proxysql_scheduler.cnf
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ source_hostgroup = 11
readyset_hostgroup = 99
warmup_time = 60
lock_file = '/tmp/readyset_scheduler.lock'
operation_mode=all
operation_mode="All"
number_of_queries=10
1 change: 1 addition & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub struct Config {
pub warmup_time: Option<u16>,
pub lock_file: Option<String>,
pub operation_mode: Option<OperationMode>,
pub number_of_queries: u16,
}

pub fn read_config_file(path: &str) -> Result<String, std::io::Error> {
Expand Down
48 changes: 42 additions & 6 deletions src/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@ use crate::{
messages,
};

const MIRROR_QUERY_TOKEN: &str = "Mirror by readyset scheduler at";
const DESTINATION_QUERY_TOKEN: &str = "Added by readyset scheduler at";

/// This function is used to find queries that are not cached in ReadySet and are not in the mysql_query_rules table.
///
/// # Arguments
/// * `conn` - A mutable reference to a pooled connection to ProxySQL.
/// * `config` - A reference to the configuration struct.
///
/// # Returns
/// A vector of tuples containing the digest_text, digest, and schema name of the queries that are not cached in ReadySet and are not in the mysql_query_rules table.
pub fn find_queries_to_cache(
conn: &mut PooledConn,
config: &Config,
Expand All @@ -23,10 +34,30 @@ pub fn find_queries_to_cache(
AND digest_text NOT LIKE '%?=?%'
AND s.sum_rows_sent > 0
AND q.rule_id IS NULL
ORDER BY s.sum_rows_sent DESC",
config.source_hostgroup, config.readyset_user
ORDER BY s.sum_rows_sent DESC
LIMIT {}",
config.source_hostgroup,
config.readyset_user,
(config.number_of_queries * 2)
))
.expect("Failed to query proxysql_conn");
.expect("Failed to find queries to cache");
rows
}

/// This function is used to check the current list of queries routed to Readyset.
///
/// # Arguments
/// * `conn` - A mutable reference to a pooled connection to ProxySQL.
///
/// # Returns
/// A vector of tuples containing the digest_text, digest, and schemaname of the queries that are currently routed to ReadySet.
pub fn find_queries_routed_to_readyset(conn: &mut PooledConn) -> Vec<String> {
let rows: Vec<String> = conn
.query(format!(
"SELECT digest FROM mysql_query_rules WHERE comment LIKE '{}%' OR comment LIKE '{}%'",
MIRROR_QUERY_TOKEN, DESTINATION_QUERY_TOKEN
))
.expect("Failed to find queries routed to ReadySet");
rows
}

Expand Down Expand Up @@ -65,10 +96,10 @@ pub fn add_query_rule(
let datetime_now: DateTime<Local> = Local::now();
let date_formatted = datetime_now.format("%Y-%m-%d %H:%M:%S");
if config.warmup_time.is_some() {
conn.query_drop(format!("INSERT INTO mysql_query_rules (username, mirror_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, 'Mirror by readyset scheduler at: {}')", config.readyset_user, config.readyset_hostgroup, digest, date_formatted)).expect("Failed to insert into mysql_query_rules");
conn.query_drop(format!("INSERT INTO mysql_query_rules (username, mirror_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, '{}: {}')", config.readyset_user, config.readyset_hostgroup, digest, MIRROR_QUERY_TOKEN, date_formatted)).expect("Failed to insert into mysql_query_rules");
messages::print_info("Inserted warm-up rule");
} else {
conn.query_drop(format!("INSERT INTO mysql_query_rules (username, destination_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, 'Added by readyset scheduler at: {}')", config.readyset_user, config.readyset_hostgroup, digest, date_formatted)).expect("Failed to insert into mysql_query_rules");
conn.query_drop(format!("INSERT INTO mysql_query_rules (username, destination_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, '{}: {}')", config.readyset_user, config.readyset_hostgroup, digest, DESTINATION_QUERY_TOKEN, date_formatted)).expect("Failed to insert into mysql_query_rules");
messages::print_info("Inserted destination rule");
}
Ok(true)
Expand All @@ -90,7 +121,7 @@ pub fn adjust_mirror_rules(conn: &mut PooledConn, config: &Config) -> Result<boo
let datetime_now: DateTime<Local> = Local::now();
let tz = datetime_now.format("%z").to_string();
let date_formatted = datetime_now.format("%Y-%m-%d %H:%M:%S");
let rows: Vec<(u16, String)> = conn.query("SELECT rule_id, comment FROM mysql_query_rules WHERE comment LIKE 'Mirror by readyset scheduler at: ____-__-__ __:__:__';").expect("Failed to select mirror rules");
let rows: Vec<(u16, String)> = conn.query(format!("SELECT rule_id, comment FROM mysql_query_rules WHERE comment LIKE '{}: ____-__-__ __:__:__';", MIRROR_QUERY_TOKEN)).expect("Failed to select mirror rules");
for (rule_id, comment) in rows {
let datetime_mirror_str = comment
.split("Mirror by readyset scheduler at:")
Expand Down Expand Up @@ -128,9 +159,13 @@ pub fn query_discovery(
) {
let mut queries_added_or_change = adjust_mirror_rules(proxysql_conn, config).unwrap();

let mut current_queries_digest: Vec<String> = find_queries_routed_to_readyset(proxysql_conn);
let rows: Vec<(String, String, String)> = find_queries_to_cache(proxysql_conn, config);

for (digest_text, digest, schema) in rows {
if current_queries_digest.len() > config.number_of_queries as usize {
break;
}
let digest_text = replace_placeholders(&digest_text);
messages::print_info(format!("Going to test query support for {}", digest_text).as_str());
let supported = check_readyset_query_support(readyset_conn, &digest_text, &schema);
Expand All @@ -144,6 +179,7 @@ pub fn query_discovery(
queries_added_or_change = true;
cache_query(readyset_conn, &digest_text).expect("Failed to create readyset cache");
add_query_rule(proxysql_conn, &digest, config).expect("Failed to add query rule");
current_queries_digest.push(digest);
}
Ok(false) => {
messages::print_info("Query is not supported");
Expand Down

0 comments on commit 57ff6b0

Please sign in to comment.