Skip to content

Commit

Permalink
Merge pull request #1507 from jqnatividad/fetch-get_response_refactor
Browse files Browse the repository at this point in the history
`fetch` & `fetchpost`: get_response refactor
  • Loading branch information
jqnatividad authored Dec 29, 2023
2 parents 907d079 + 6fb1bf9 commit 2fd23ee
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 140 deletions.
126 changes: 56 additions & 70 deletions src/cmd/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ use governor::{
use indicatif::{HumanCount, MultiProgress, ProgressBar, ProgressDrawTarget};
use log::{
debug, error, info, log_enabled, warn,
Level::{Debug, Info, Trace, Warn},
Level::{Debug, Trace, Warn},
};
use rand::Rng;
use redis;
Expand Down Expand Up @@ -425,7 +425,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
)
},
};
info!("RATE LIMIT: {rate_limit}");
debug!("RATE LIMIT: {rate_limit}");

let http_headers: HeaderMap = {
let mut map = HeaderMap::with_capacity(args.flag_http_header.len() + 1);
Expand Down Expand Up @@ -888,6 +888,27 @@ fn get_redis_response(
}))
}

/// Get the value of a header from the ratelimit API response
/// given its name or its x-name
pub fn get_ratelimit_header_value<'a>(
api_respheader: &'a HeaderMap,
header: &'a str,
x_header: &'a str,
) -> Option<&'a HeaderValue> {
api_respheader
.get(header)
.or_else(|| api_respheader.get(x_header))
}

/// Parse the value of a header from the ratelimit API response
/// return sentinel value if the header is not found or if the value is not a valid u64
/// return 1 if the value is 0
pub fn parse_ratelimit_header_value(value: Option<&HeaderValue>, sentinel_value: u64) -> u64 {
value.map_or(sentinel_value, |v| {
atoi_simd::parse_pos::<u64>(v.to_str().unwrap().as_bytes()).unwrap_or(1)
})
}

#[inline]
fn get_response(
url: &str,
Expand Down Expand Up @@ -929,7 +950,7 @@ fn get_response(
};
},
};
info!("Using URL: {valid_url}");
debug!("Using URL: {valid_url}");

// wait until RateLimiter gives Okay or we timeout
let mut limiter_total_wait: u64;
Expand All @@ -942,6 +963,8 @@ fn get_response(
let mut api_status;
let mut api_respheader = HeaderMap::new();

let debug_flag = log_enabled!(Debug);

// request with --max-retries
'retry: loop {
// check the rate-limiter
Expand All @@ -950,15 +973,14 @@ fn get_response(
limiter_total_wait += MINIMUM_WAIT_MS;
thread::sleep(MIN_WAIT);
if limiter_total_wait > governor_timeout_ms {
info!("rate limit timed out after {limiter_total_wait} ms");
debug!("rate limit timed out after {limiter_total_wait} ms");
break;
} else if limiter_total_wait == MINIMUM_WAIT_MS {
info!("throttling...");
debug!("throttling...");
}
}
if log_enabled!(Info) && limiter_total_wait > 0 && limiter_total_wait <= governor_timeout_ms
{
info!("throttled for {limiter_total_wait} ms");
if debug_flag && limiter_total_wait > 0 && limiter_total_wait <= governor_timeout_ms {
debug!("throttled for {limiter_total_wait} ms");
}

// send the actual request
Expand Down Expand Up @@ -1034,40 +1056,31 @@ fn get_response(
|| api_respheader.contains_key("x-ratelimit-limit")
|| api_respheader.contains_key("retry-after")))
{
let mut ratelimit_remaining = api_respheader.get("ratelimit-remaining");
if ratelimit_remaining.is_none() {
let temp_var = api_respheader.get("x-ratelimit-remaining");
if temp_var.is_some() {
ratelimit_remaining = temp_var;
}
}
let mut ratelimit_reset = api_respheader.get("ratelimit-reset");
if ratelimit_reset.is_none() {
let temp_var = api_respheader.get("x-ratelimit-reset");
if temp_var.is_some() {
ratelimit_reset = temp_var;
}
}
let ratelimit_remaining = get_ratelimit_header_value(
&api_respheader,
"ratelimit-remaining",
"x-ratelimit-remaining",
);

let ratelimit_reset =
get_ratelimit_header_value(&api_respheader, "ratelimit-reset", "x-ratelimit-reset");

// some APIs add the "-second" suffix to ratelimit fields
let mut ratelimit_remaining_sec = api_respheader.get("ratelimit-remaining-second");
if ratelimit_remaining_sec.is_none() {
let temp_var = api_respheader.get("x-ratelimit-remaining-second");
if temp_var.is_some() {
ratelimit_remaining_sec = temp_var;
}
}
let mut ratelimit_reset_sec = api_respheader.get("ratelimit-reset-second");
if ratelimit_reset_sec.is_none() {
let temp_var = api_respheader.get("x-ratelimit-reset-second");
if temp_var.is_some() {
ratelimit_reset_sec = temp_var;
}
}
let ratelimit_remaining_sec = get_ratelimit_header_value(
&api_respheader,
"ratelimit-remaining-second",
"x-ratelimit-remaining-second",
);

let ratelimit_reset_sec = get_ratelimit_header_value(
&api_respheader,
"ratelimit-reset-second",
"x-ratelimit-reset-second",
);

let retry_after = api_respheader.get("retry-after");

if log_enabled!(Debug) {
if debug_flag {
let rapidapi_proxy_response = api_respheader.get("X-RapidAPI-Proxy-Response");

debug!(
Expand All @@ -1080,40 +1093,13 @@ fn get_response(

// if there's a ratelimit_remaining field in the response header, get it
// otherwise, set remaining to sentinel value 9999
let remaining = ratelimit_remaining.map_or_else(
|| {
if let Some(ratelimit_remaining_sec) = ratelimit_remaining_sec {
let remaining_sec_str = ratelimit_remaining_sec.to_str().unwrap();
atoi_simd::parse_pos::<u64>(remaining_sec_str.as_bytes()).unwrap_or(1)
} else {
9999_u64
}
},
|ratelimit_remaining| {
let remaining_str = ratelimit_remaining.to_str().unwrap();
atoi_simd::parse_pos::<u64>(remaining_str.as_bytes()).unwrap_or(1)
},
);
let remaining =
parse_ratelimit_header_value(ratelimit_remaining.or(ratelimit_remaining_sec), 9999);

// if there's a ratelimit_reset field in the response header, get it
// otherwise, set reset to sentinel value 0
let mut reset_secs = ratelimit_reset.map_or_else(
|| {
if let Some(ratelimit_reset_sec) = ratelimit_reset_sec {
let reset_sec_str = ratelimit_reset_sec.to_str().unwrap();
atoi_simd::parse_pos::<u64>(reset_sec_str.as_bytes()).unwrap_or(1)
} else {
// sleep for at least 1 second if we get an API error,
// even if there is no ratelimit_reset header,
// otherwise return 0
u64::from(error_flag)
}
},
|ratelimit_reset| {
let reset_str = ratelimit_reset.to_str().unwrap();
atoi_simd::parse_pos::<u64>(reset_str.as_bytes()).unwrap_or(1)
},
);
let mut reset_secs =
parse_ratelimit_header_value(ratelimit_reset.or(ratelimit_reset_sec), 0);

// if there's a retry_after field in the response header, get it
// and set reset to it
Expand Down Expand Up @@ -1146,7 +1132,7 @@ fn get_response(
let pause_time =
(reset_secs * 1001) + (retries as u64 * rand::thread_rng().gen_range(10..30));

info!(
debug!(
"sleeping for {pause_time} ms until ratelimit is reset/retry_after has elapsed"
);
thread::sleep(time::Duration::from_millis(pause_time));
Expand All @@ -1157,7 +1143,7 @@ fn get_response(
break 'retry;
}
retries += 1;
info!("retrying {retries}...");
debug!("retrying {retries}...");
} else {
// there's no request error or ratelimits nor retry-after
break 'retry;
Expand Down
105 changes: 35 additions & 70 deletions src/cmd/fetchpost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ use governor::{
use indicatif::{HumanCount, MultiProgress, ProgressBar, ProgressDrawTarget};
use log::{
debug, error, info, log_enabled, warn,
Level::{Debug, Info, Trace, Warn},
Level::{Debug, Trace, Warn},
};
use rand::Rng;
use redis;
Expand All @@ -205,7 +205,7 @@ use simdutf8::basic::from_utf8;
use url::Url;

use crate::{
cmd::fetch::process_jql,
cmd::fetch::{get_ratelimit_header_value, parse_ratelimit_header_value, process_jql},
config::{Config, Delimiter},
select::SelectColumns,
util, CliError, CliResult,
Expand Down Expand Up @@ -946,7 +946,7 @@ fn get_response(
};
},
};
info!("Using URL: {valid_url}");
debug!("Using URL: {valid_url}");

// wait until RateLimiter gives Okay or we timeout
let mut limiter_total_wait: u64;
Expand All @@ -959,6 +959,8 @@ fn get_response(
let mut api_status;
let mut api_respheader = HeaderMap::new();

let debug_flag = log_enabled!(Debug);

// request with --max-retries
'retry: loop {
// check the rate-limiter
Expand All @@ -967,15 +969,14 @@ fn get_response(
limiter_total_wait += MINIMUM_WAIT_MS;
thread::sleep(MIN_WAIT);
if limiter_total_wait > governor_timeout_ms {
info!("rate limit timed out after {limiter_total_wait} ms");
debug!("rate limit timed out after {limiter_total_wait} ms");
break;
} else if limiter_total_wait == MINIMUM_WAIT_MS {
info!("throttling...");
debug!("throttling...");
}
}
if log_enabled!(Info) && limiter_total_wait > 0 && limiter_total_wait <= governor_timeout_ms
{
info!("throttled for {limiter_total_wait} ms");
if debug_flag && limiter_total_wait > 0 && limiter_total_wait <= governor_timeout_ms {
debug!("throttled for {limiter_total_wait} ms");
}

// send the actual request
Expand Down Expand Up @@ -1065,40 +1066,31 @@ fn get_response(
|| api_respheader.contains_key("x-ratelimit-limit")
|| api_respheader.contains_key("retry-after")))
{
let mut ratelimit_remaining = api_respheader.get("ratelimit-remaining");
if ratelimit_remaining.is_none() {
let temp_var = api_respheader.get("x-ratelimit-remaining");
if temp_var.is_some() {
ratelimit_remaining = temp_var;
}
}
let mut ratelimit_reset = api_respheader.get("ratelimit-reset");
if ratelimit_reset.is_none() {
let temp_var = api_respheader.get("x-ratelimit-reset");
if temp_var.is_some() {
ratelimit_reset = temp_var;
}
}
let ratelimit_remaining = get_ratelimit_header_value(
&api_respheader,
"ratelimit-remaining",
"x-ratelimit-remaining",
);

let ratelimit_reset =
get_ratelimit_header_value(&api_respheader, "ratelimit-reset", "x-ratelimit-reset");

// some APIs add the "-second" suffix to ratelimit fields
let mut ratelimit_remaining_sec = api_respheader.get("ratelimit-remaining-second");
if ratelimit_remaining_sec.is_none() {
let temp_var = api_respheader.get("x-ratelimit-remaining-second");
if temp_var.is_some() {
ratelimit_remaining_sec = temp_var;
}
}
let mut ratelimit_reset_sec = api_respheader.get("ratelimit-reset-second");
if ratelimit_reset_sec.is_none() {
let temp_var = api_respheader.get("x-ratelimit-reset-second");
if temp_var.is_some() {
ratelimit_reset_sec = temp_var;
}
}
let ratelimit_remaining_sec = get_ratelimit_header_value(
&api_respheader,
"ratelimit-remaining-second",
"x-ratelimit-remaining-second",
);

let ratelimit_reset_sec = get_ratelimit_header_value(
&api_respheader,
"ratelimit-reset-second",
"x-ratelimit-reset-second",
);

let retry_after = api_respheader.get("retry-after");

if log_enabled!(Debug) {
if debug_flag {
let rapidapi_proxy_response = api_respheader.get("X-RapidAPI-Proxy-Response");

debug!(
Expand All @@ -1111,40 +1103,13 @@ fn get_response(

// if there's a ratelimit_remaining field in the response header, get it
// otherwise, set remaining to sentinel value 9999
let remaining = ratelimit_remaining.map_or_else(
|| {
if let Some(ratelimit_remaining_sec) = ratelimit_remaining_sec {
let remaining_sec_str = ratelimit_remaining_sec.to_str().unwrap();
remaining_sec_str.parse::<u64>().unwrap_or(1)
} else {
9999_u64
}
},
|ratelimit_remaining| {
let remaining_str = ratelimit_remaining.to_str().unwrap();
remaining_str.parse::<u64>().unwrap_or(1)
},
);
let remaining =
parse_ratelimit_header_value(ratelimit_remaining.or(ratelimit_remaining_sec), 9999);

// if there's a ratelimit_reset field in the response header, get it
// otherwise, set reset to sentinel value 0
let mut reset_secs = ratelimit_reset.map_or_else(
|| {
if let Some(ratelimit_reset_sec) = ratelimit_reset_sec {
let reset_sec_str = ratelimit_reset_sec.to_str().unwrap();
reset_sec_str.parse::<u64>().unwrap_or(1)
} else {
// sleep for at least 1 second if we get an API error,
// even if there is no ratelimit_reset header,
// otherwise return 0
u64::from(error_flag)
}
},
|ratelimit_reset| {
let reset_str = ratelimit_reset.to_str().unwrap();
reset_str.parse::<u64>().unwrap_or(1)
},
);
let mut reset_secs =
parse_ratelimit_header_value(ratelimit_reset.or(ratelimit_reset_sec), 0);

// if there's a retry_after field in the response header, get it
// and set reset to it
Expand Down Expand Up @@ -1175,7 +1140,7 @@ fn get_response(
// more breathing room before we hit it again
let pause_time =
(reset_secs * 1001) + (retries as u64 * rand::thread_rng().gen_range(10..30));
info!(
debug!(
"sleeping for {pause_time} ms until ratelimit is reset/retry_after has elapsed"
);

Expand All @@ -1187,7 +1152,7 @@ fn get_response(
break 'retry;
}
retries += 1;
info!("retrying {retries}...");
debug!("retrying {retries}...");
} else {
// there's no request error or ratelimits nor retry-after
break 'retry;
Expand Down

0 comments on commit 2fd23ee

Please sign in to comment.