Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit number of caches #13

Merged
merged 1 commit into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion readyset_proxysql_scheduler.cnf
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ source_hostgroup = 11
readyset_hostgroup = 99
warmup_time = 60
lock_file = '/tmp/readyset_scheduler.lock'
operation_mode=all
operation_mode="All"
number_of_queries=10
1 change: 1 addition & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub struct Config {
pub warmup_time: Option<u16>,
pub lock_file: Option<String>,
pub operation_mode: Option<OperationMode>,
pub number_of_queries: u16,
}

pub fn read_config_file(path: &str) -> Result<String, std::io::Error> {
Expand Down
48 changes: 42 additions & 6 deletions src/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@ use crate::{
messages,
};

const MIRROR_QUERY_TOKEN: &str = "Mirror by readyset scheduler at";
const DESTINATION_QUERY_TOKEN: &str = "Added by readyset scheduler at";

/// This function is used to find queries that are not cached in ReadySet and are not in the mysql_query_rules table.
///
/// # Arguments
/// * `conn` - A mutable reference to a pooled connection to ProxySQL.
/// * `config` - A reference to the configuration struct.
///
/// # Returns
/// A vector of tuples containing the digest_text, digest, and schema name of the queries that are not cached in ReadySet and are not in the mysql_query_rules table.
pub fn find_queries_to_cache(
conn: &mut PooledConn,
config: &Config,
Expand All @@ -23,10 +34,30 @@ pub fn find_queries_to_cache(
AND digest_text NOT LIKE '%?=?%'
AND s.sum_rows_sent > 0
AND q.rule_id IS NULL
ORDER BY s.sum_rows_sent DESC",
config.source_hostgroup, config.readyset_user
ORDER BY s.sum_rows_sent DESC
LIMIT {}",
config.source_hostgroup,
config.readyset_user,
(config.number_of_queries * 2)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly a stupid question, but why 2x? Just to have extras in case some can't be cached?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No stupid. This is indeed to account for query not supported. We get twice as much queries we should cache and start testing them, stopping when we reach number_of_queries config.

))
.expect("Failed to query proxysql_conn");
.expect("Failed to find queries to cache");
rows
}

/// This function is used to check the current list of queries routed to Readyset.
///
/// # Arguments
/// * `conn` - A mutable reference to a pooled connection to ProxySQL.
///
/// # Returns
/// A vector of tuples containing the digest_text, digest, and schemaname of the queries that are currently routed to ReadySet.
pub fn find_queries_routed_to_readyset(conn: &mut PooledConn) -> Vec<String> {
let rows: Vec<String> = conn
.query(format!(
"SELECT digest FROM mysql_query_rules WHERE comment LIKE '{}%' OR comment LIKE '{}%'",
MIRROR_QUERY_TOKEN, DESTINATION_QUERY_TOKEN
))
.expect("Failed to find queries routed to ReadySet");
rows
}

Expand Down Expand Up @@ -65,10 +96,10 @@ pub fn add_query_rule(
let datetime_now: DateTime<Local> = Local::now();
let date_formatted = datetime_now.format("%Y-%m-%d %H:%M:%S");
if config.warmup_time.is_some() {
conn.query_drop(format!("INSERT INTO mysql_query_rules (username, mirror_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, 'Mirror by readyset scheduler at: {}')", config.readyset_user, config.readyset_hostgroup, digest, date_formatted)).expect("Failed to insert into mysql_query_rules");
conn.query_drop(format!("INSERT INTO mysql_query_rules (username, mirror_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, '{}: {}')", config.readyset_user, config.readyset_hostgroup, digest, MIRROR_QUERY_TOKEN, date_formatted)).expect("Failed to insert into mysql_query_rules");
messages::print_info("Inserted warm-up rule");
} else {
conn.query_drop(format!("INSERT INTO mysql_query_rules (username, destination_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, 'Added by readyset scheduler at: {}')", config.readyset_user, config.readyset_hostgroup, digest, date_formatted)).expect("Failed to insert into mysql_query_rules");
conn.query_drop(format!("INSERT INTO mysql_query_rules (username, destination_hostgroup, active, digest, apply, comment) VALUES ('{}', {}, 1, '{}', 1, '{}: {}')", config.readyset_user, config.readyset_hostgroup, digest, DESTINATION_QUERY_TOKEN, date_formatted)).expect("Failed to insert into mysql_query_rules");
messages::print_info("Inserted destination rule");
}
Ok(true)
Expand All @@ -90,7 +121,7 @@ pub fn adjust_mirror_rules(conn: &mut PooledConn, config: &Config) -> Result<boo
let datetime_now: DateTime<Local> = Local::now();
let tz = datetime_now.format("%z").to_string();
let date_formatted = datetime_now.format("%Y-%m-%d %H:%M:%S");
let rows: Vec<(u16, String)> = conn.query("SELECT rule_id, comment FROM mysql_query_rules WHERE comment LIKE 'Mirror by readyset scheduler at: ____-__-__ __:__:__';").expect("Failed to select mirror rules");
let rows: Vec<(u16, String)> = conn.query(format!("SELECT rule_id, comment FROM mysql_query_rules WHERE comment LIKE '{}: ____-__-__ __:__:__';", MIRROR_QUERY_TOKEN)).expect("Failed to select mirror rules");
for (rule_id, comment) in rows {
let datetime_mirror_str = comment
.split("Mirror by readyset scheduler at:")
Expand Down Expand Up @@ -128,9 +159,13 @@ pub fn query_discovery(
) {
let mut queries_added_or_change = adjust_mirror_rules(proxysql_conn, config).unwrap();

let mut current_queries_digest: Vec<String> = find_queries_routed_to_readyset(proxysql_conn);
let rows: Vec<(String, String, String)> = find_queries_to_cache(proxysql_conn, config);

for (digest_text, digest, schema) in rows {
if current_queries_digest.len() > config.number_of_queries as usize {
break;
}
let digest_text = replace_placeholders(&digest_text);
messages::print_info(format!("Going to test query support for {}", digest_text).as_str());
let supported = check_readyset_query_support(readyset_conn, &digest_text, &schema);
Expand All @@ -144,6 +179,7 @@ pub fn query_discovery(
queries_added_or_change = true;
cache_query(readyset_conn, &digest_text).expect("Failed to create readyset cache");
add_query_rule(proxysql_conn, &digest, config).expect("Failed to add query rule");
current_queries_digest.push(digest);
}
Ok(false) => {
messages::print_info("Query is not supported");
Expand Down