diff --git a/readyset_proxysql_scheduler.cnf b/readyset_proxysql_scheduler.cnf index 7e7c089..a73df77 100644 --- a/readyset_proxysql_scheduler.cnf +++ b/readyset_proxysql_scheduler.cnf @@ -10,4 +10,5 @@ source_hostgroup = 11 readyset_hostgroup = 99 warmup_time = 60 lock_file = '/tmp/readyset_scheduler.lock' -operation_mode=all +operation_mode="All" +number_of_queries=10 diff --git a/src/config.rs b/src/config.rs index 58eda48..4e9ce06 100644 --- a/src/config.rs +++ b/src/config.rs @@ -48,6 +48,7 @@ pub struct Config { pub warmup_time: Option, pub lock_file: Option, pub operation_mode: Option, + pub number_of_queries: u16, } pub fn read_config_file(path: &str) -> Result { diff --git a/src/queries.rs b/src/queries.rs index b53d479..9d23b48 100644 --- a/src/queries.rs +++ b/src/queries.rs @@ -6,6 +6,17 @@ use crate::{ messages, }; +const MIRROR_QUERY_TOKEN: &str = "Mirror by readyset scheduler at"; +const DESTINATION_QUERY_TOKEN: &str = "Added by readyset scheduler at"; + +/// This function is used to find queries that are not cached in ReadySet and are not in the mysql_query_rules table. +/// +/// # Arguments +/// * `conn` - A mutable reference to a pooled connection to ProxySQL. +/// * `config` - A reference to the configuration struct. +/// +/// # Returns +/// A vector of tuples containing the digest_text, digest, and schema name of the queries that are not cached in ReadySet and are not in the mysql_query_rules table. pub fn find_queries_to_cache( conn: &mut PooledConn, config: &Config, @@ -23,10 +34,30 @@ pub fn find_queries_to_cache( AND digest_text NOT LIKE '%?=?%' AND s.sum_rows_sent > 0 AND q.rule_id IS NULL - ORDER BY s.sum_rows_sent DESC", - config.source_hostgroup, config.readyset_user + ORDER BY s.sum_rows_sent DESC + LIMIT {}", + config.source_hostgroup, + config.readyset_user, + (config.number_of_queries * 2) )) - .expect("Failed to query proxysql_conn"); + .expect("Failed to find queries to cache"); + rows +} + +/// This function is used to check the current list of queries routed to Readyset. +/// +/// # Arguments +/// * `conn` - A mutable reference to a pooled connection to ProxySQL. +/// +/// # Returns +/// A vector of tuples containing the digest_text, digest, and schemaname of the queries that are currently routed to ReadySet. +pub fn find_queries_routed_to_readyset(conn: &mut PooledConn) -> Vec { + let rows: Vec = conn + .query(format!( + "SELECT digest FROM mysql_query_rules WHERE comment LIKE '{}%' OR comment LIKE '{}%'", + MIRROR_QUERY_TOKEN, DESTINATION_QUERY_TOKEN + )) + .expect("Failed to find queries routed to ReadySet"); rows } @@ -65,10 +96,10 @@ pub fn add_query_rule( let datetime_now: DateTime = Local::now(); let date_formatted = datetime_now.format("%Y-%m-%d %H:%M:%S"); if config.warmup_time.is_some() { - conn.query_drop(format!("INSERT INTO mysql_query_rules (username, mirror_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, 'Mirror by readyset scheduler at: {}')", config.readyset_user, config.readyset_hostgroup, digest, date_formatted)).expect("Failed to insert into mysql_query_rules"); + conn.query_drop(format!("INSERT INTO mysql_query_rules (username, mirror_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, '{}: {}')", config.readyset_user, config.readyset_hostgroup, digest, MIRROR_QUERY_TOKEN, date_formatted)).expect("Failed to insert into mysql_query_rules"); messages::print_info("Inserted warm-up rule"); } else { - conn.query_drop(format!("INSERT INTO mysql_query_rules (username, destination_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, 'Added by readyset scheduler at: {}')", config.readyset_user, config.readyset_hostgroup, digest, date_formatted)).expect("Failed to insert into mysql_query_rules"); + conn.query_drop(format!("INSERT INTO mysql_query_rules (username, destination_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, '{}: {}')", config.readyset_user, config.readyset_hostgroup, digest, DESTINATION_QUERY_TOKEN, date_formatted)).expect("Failed to insert into mysql_query_rules"); messages::print_info("Inserted destination rule"); } Ok(true) @@ -90,7 +121,7 @@ pub fn adjust_mirror_rules(conn: &mut PooledConn, config: &Config) -> Result = Local::now(); let tz = datetime_now.format("%z").to_string(); let date_formatted = datetime_now.format("%Y-%m-%d %H:%M:%S"); - let rows: Vec<(u16, String)> = conn.query("SELECT rule_id, comment FROM mysql_query_rules WHERE comment LIKE 'Mirror by readyset scheduler at: ____-__-__ __:__:__';").expect("Failed to select mirror rules"); + let rows: Vec<(u16, String)> = conn.query(format!("SELECT rule_id, comment FROM mysql_query_rules WHERE comment LIKE '{}: ____-__-__ __:__:__';", MIRROR_QUERY_TOKEN)).expect("Failed to select mirror rules"); for (rule_id, comment) in rows { let datetime_mirror_str = comment .split("Mirror by readyset scheduler at:") @@ -128,9 +159,13 @@ pub fn query_discovery( ) { let mut queries_added_or_change = adjust_mirror_rules(proxysql_conn, config).unwrap(); + let mut current_queries_digest: Vec = find_queries_routed_to_readyset(proxysql_conn); let rows: Vec<(String, String, String)> = find_queries_to_cache(proxysql_conn, config); for (digest_text, digest, schema) in rows { + if current_queries_digest.len() > config.number_of_queries as usize { + break; + } let digest_text = replace_placeholders(&digest_text); messages::print_info(format!("Going to test query support for {}", digest_text).as_str()); let supported = check_readyset_query_support(readyset_conn, &digest_text, &schema); @@ -144,6 +179,7 @@ pub fn query_discovery( queries_added_or_change = true; cache_query(readyset_conn, &digest_text).expect("Failed to create readyset cache"); add_query_rule(proxysql_conn, &digest, config).expect("Failed to add query rule"); + current_queries_digest.push(digest); } Ok(false) => { messages::print_info("Query is not supported");