Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support requeue-duration as parameter #95

Merged
merged 2 commits into from
Sep 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use crate::k8s::K8s;

#[derive(Copy, Clone)]
pub struct RunoConfig {
pub(crate) k8s: K8s,
pub(crate) requeue_duration: u64,
}

impl RunoConfig {
pub fn build(k8s: K8s, requeue_duration: u64) -> RunoConfig {
RunoConfig {
k8s,
requeue_duration,
}
}
}
6 changes: 3 additions & 3 deletions src/cron.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ fn build_security_context() -> Option<SecurityContext> {
})
}

async fn create_or_replace(cj: CronJob, namespace: &str, k8s: &Arc<K8s>) {
async fn create_or_replace(cj: CronJob, namespace: &str, k8s: &K8s) {
let cronjobs: Api<CronJob> = Api::namespaced(K8s::get_client().await, namespace);
let c = cronjobs.create(&k8s.get_post_params(), &cj).await;
match c {
Expand All @@ -122,7 +122,7 @@ pub fn build_cron_name(obj: &Arc<Secret>, id: &str) -> String {
format!("runo-renewal-{}-{}", trunc_obj_name, id)
}

pub async fn update(obj: &Arc<Secret>, kube: &Arc<K8s>) {
pub async fn update(obj: &Arc<Secret>, k8s: &K8s) {
match obj.namespace() {
Some(namespace) => {
for id in id_iter(obj) {
Expand All @@ -133,7 +133,7 @@ pub async fn update(obj: &Arc<Secret>, kube: &Arc<K8s>) {
id
);
let cj = build_cronjob(obj, obj.name_any().as_str(), &id);
create_or_replace(cj, &namespace, kube).await
create_or_replace(cj, &namespace, k8s).await
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/k8s.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub struct K8s {
}

impl K8s {
pub fn new(dry_run: bool) -> K8s {
pub fn build(dry_run: bool) -> K8s {
if dry_run {
info!("Running runo in dry-run mode!")
}
Expand Down
11 changes: 8 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod annotations;
mod config;
mod cron;
mod errors;
mod http;
Expand All @@ -11,6 +12,7 @@ mod secrets;
use crate::k8s::K8s;
use anyhow::anyhow;
use clap::Parser;
use config::RunoConfig;
use tracing::info;

#[derive(Parser)]
Expand All @@ -22,6 +24,8 @@ struct MainArgs {
dry_run: bool,
#[clap(long, default_value_t = String::from("reconciliation"))]
mode: String,
#[clap(long, default_value_t = 300)]
requeue_duration: u64,
}

#[tokio::main]
Expand All @@ -31,12 +35,13 @@ async fn main() -> anyhow::Result<()> {
true => info!("Logging initialized.."),
false => panic!("Logging not initialized properly!. Exiting..."),
}
let k8s = K8s::new(args.dry_run);
let k8s = K8s::build(args.dry_run);
let config = RunoConfig::build(k8s, args.requeue_duration);
match args.mode.as_str() {
"reconciliation" => {
info!("Running runo in reconciliation mode.");
let http_server_result = http::run_http_server(args.http_port);
let reconciler = reconciler::run_with_reconciliation(k8s);
let reconciler = reconciler::run_with_reconciliation(config);
match http_server_result {
Ok(http_server) => {
tokio::join!(reconciler, http_server).1.unwrap();
Expand All @@ -47,7 +52,7 @@ async fn main() -> anyhow::Result<()> {
}
"one-shot" => {
info!("Running runo in one-shot mode.");
reconciler::run_one_shot(k8s).await;
reconciler::run_one_shot(config).await;
Ok(())
}
_ => Err(anyhow!("Mode is not supported!: {:?}", args.mode)),
Expand Down
76 changes: 50 additions & 26 deletions src/reconciler.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::config::RunoConfig;
use crate::{cron, labels, secrets};
use k8s_openapi::api::core::v1::Secret;
use kube::runtime::controller::Action;
Expand All @@ -16,20 +17,22 @@ use tracing::info;
pub enum Error {}
pub type Result<T, E = Error> = std::result::Result<T, E>;

pub(crate) async fn reconcile(obj: Arc<Secret>, k8s: Arc<K8s>) -> Result<Action> {
pub(crate) async fn reconcile(obj: Arc<Secret>, config: Arc<RunoConfig>) -> Result<Action> {
info!("reconcile request: {}", obj.name_any());
if labels::managed_by_us(&obj) {
secrets::update(&obj, &k8s).await;
cron::update(&obj, &k8s).await
secrets::update(&obj, &config.k8s).await;
cron::update(&obj, &config.k8s).await
}
Ok(Action::requeue(Duration::from_secs(3600)))
Ok(Action::requeue(Duration::from_secs(
config.requeue_duration,
)))
}

pub(crate) fn error_policy(_object: Arc<Secret>, _err: &Error, _k8s: Arc<K8s>) -> Action {
pub(crate) fn error_policy(_object: Arc<Secret>, _err: &Error, _config: Arc<RunoConfig>) -> Action {
Action::requeue(Duration::from_secs(5))
}

pub async fn run_with_reconciliation(k8s: K8s) {
pub async fn run_with_reconciliation(config: RunoConfig) {
let client = K8s::get_client().await;
let secrets = Api::<Secret>::all(client);
let watcher_config = Config {
Expand All @@ -38,22 +41,23 @@ pub async fn run_with_reconciliation(k8s: K8s) {
};
Controller::new(secrets.clone(), watcher_config)
.shutdown_on_signal()
.run(reconcile, error_policy, Arc::new(k8s))
.run(reconcile, error_policy, Arc::new(config))
.filter_map(|x| async move { std::result::Result::ok(x) })
.for_each(|_| futures::future::ready(()))
.await;
}

pub async fn run_one_shot(k8s: K8s) {
pub async fn run_one_shot(config: RunoConfig) {
let client = K8s::get_client().await;
let secrets = Api::<Secret>::all(client);
for secret in secrets.list(&ListParams::default()).await.unwrap() {
let _ = reconcile(Arc::new(secret), Arc::new(k8s)).await;
let _ = reconcile(Arc::new(secret), Arc::new(config)).await;
}
}

#[cfg(test)]
mod tests {
use crate::config::RunoConfig;
use crate::reconciler::reconcile;
use k8s_openapi::api::core::v1::Secret;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
Expand Down Expand Up @@ -138,7 +142,9 @@ mod tests {
.await
.unwrap();
let client = Client::try_from(config).unwrap();
let k8s = Arc::new(K8s::new(false));

let k8s = K8s::build(false);
let runo_config = Arc::new(RunoConfig::build(k8s, 300));

let key_0 = String::from("v1.secret.runo.rocks/generate-0");
let value_0 = String::from("username");
Expand All @@ -158,7 +164,7 @@ mod tests {
assert!(secrets.get(secret_name).await.unwrap().data.is_none());

// reconcile it
reconcile(Arc::new(secret), k8s).await.unwrap();
reconcile(Arc::new(secret), runo_config).await.unwrap();

// Value for field username should be generated
assert!(secrets
Expand Down Expand Up @@ -191,7 +197,9 @@ mod tests {
.await
.unwrap();
let client = Client::try_from(config).unwrap();
let k8s = Arc::new(K8s::new(false));

let k8s = K8s::build(false);
let runo_config = Arc::new(RunoConfig::build(k8s, 300));

let key_0 = String::from("v1.secret.runo.rocks/generate-0");
let value_0 = String::from("username");
Expand All @@ -211,7 +219,7 @@ mod tests {
assert!(secrets.get(secret_name).await.unwrap().data.is_none());

// reconcile it
reconcile(Arc::new(secret), k8s).await.unwrap();
reconcile(Arc::new(secret), runo_config).await.unwrap();

// Data should still be empty
assert!(secrets.get(secret_name).await.unwrap().data.is_none());
Expand All @@ -228,7 +236,9 @@ mod tests {
.await
.unwrap();
let client = Client::try_from(config).unwrap();
let k8s = Arc::new(K8s::new(false));

let k8s = K8s::build(false);
let runo_config = Arc::new(RunoConfig::build(k8s, 300));

let key_1 = String::from("v1.secret.runo.rocks/generate-0");
let value_1 = String::from("username");
Expand All @@ -248,7 +258,7 @@ mod tests {
assert!(secrets.get(secret_name).await.unwrap().data.is_none());

// reconcile it
reconcile(Arc::new(secret), k8s).await.unwrap();
reconcile(Arc::new(secret), runo_config).await.unwrap();

// Value for field username should be generated and has length of 10
let secret = secrets.get(secret_name).await.unwrap().data.unwrap();
Expand All @@ -267,7 +277,9 @@ mod tests {
.await
.unwrap();
let client = Client::try_from(config).unwrap();
let k8s = Arc::new(K8s::new(false));

let k8s = K8s::build(false);
let runo_config = Arc::new(RunoConfig::build(k8s, 300));

let key_1 = String::from("v1.secret.runo.rocks/generate-0");
let value_1 = String::from("username");
Expand All @@ -287,7 +299,7 @@ mod tests {
assert!(secrets.get(secret_name).await.unwrap().data.is_none());

// reconcile it
reconcile(Arc::new(secret), k8s).await.unwrap();
reconcile(Arc::new(secret), runo_config).await.unwrap();

// Value for field username should be generated and match the charset
let secret = secrets.get(secret_name).await.unwrap().data.unwrap();
Expand All @@ -308,7 +320,9 @@ mod tests {
.await
.unwrap();
let client = Client::try_from(config).unwrap();
let k8s = Arc::new(K8s::new(false));

let k8s = K8s::build(false);
let runo_config = Arc::new(RunoConfig::build(k8s, 300));

let key_1 = String::from("v1.secret.runo.rocks/generate-0");
let value_1 = String::from("username");
Expand All @@ -328,7 +342,7 @@ mod tests {
assert!(secrets.get(secret_name).await.unwrap().data.is_none());

// reconcile it
reconcile(Arc::new(secret), k8s).await.unwrap();
reconcile(Arc::new(secret), runo_config).await.unwrap();

// Value for field username should be generated and match the pattern
let secret = secrets.get(secret_name).await.unwrap().data.unwrap();
Expand All @@ -349,7 +363,9 @@ mod tests {
.await
.unwrap();
let client = Client::try_from(config).unwrap();
let k8s = Arc::new(K8s::new(false));

let k8s = K8s::build(false);
let runo_config = Arc::new(RunoConfig::build(k8s, 300));

let key_1 = String::from("v1.secret.runo.rocks/generate-0");
let value_1 = String::from("username");
Expand All @@ -368,7 +384,7 @@ mod tests {
assert!(secrets.get(secret_name).await.unwrap().data.is_none());

// reconcile it
reconcile(Arc::new(secret.clone()), k8s.clone())
reconcile(Arc::new(secret.clone()), runo_config.clone())
.await
.unwrap();
let secret_before_cron = secrets.get(secret_name).await.unwrap().data.unwrap();
Expand All @@ -392,7 +408,9 @@ mod tests {
);

// reconcile again to renewal secret
reconcile(Arc::new(secret.clone()), k8s).await.unwrap();
reconcile(Arc::new(secret.clone()), runo_config)
.await
.unwrap();
let secret_after_cron = secrets.get(secret_name).await.unwrap().data.unwrap();
let username_after_cron = from_utf8(&secret_after_cron.get("username").unwrap().0).unwrap();
assert_ne!(username_before_cron, username_after_cron);
Expand All @@ -418,7 +436,9 @@ mod tests {
.await
.unwrap();
let client = Client::try_from(config).unwrap();
let k8s = Arc::new(K8s::new(true));

let k8s = K8s::build(true);
let runo_config = Arc::new(RunoConfig::build(k8s, 300));

let key_0 = String::from("v1.secret.runo.rocks/generate-0");
let value_0 = String::from("username");
Expand All @@ -433,7 +453,7 @@ mod tests {
assert!(secrets.get(secret_name).await.unwrap().data.is_none());

// reconcile it
reconcile(Arc::new(secret), k8s).await.unwrap();
reconcile(Arc::new(secret), runo_config).await.unwrap();

// Value for field username should not be generated
assert!(secrets.get(secret_name).await.unwrap().data.is_none());
Expand All @@ -451,7 +471,9 @@ mod tests {
.await
.unwrap();
let client = Client::try_from(config).unwrap();
let k8s = Arc::new(K8s::new(true));

let k8s = K8s::build(true);
let runo_config = Arc::new(RunoConfig::build(k8s, 300));

let key_0 = String::from("v1.secret.runo.rocks/generate-0");
let value_0 = String::from("username");
Expand All @@ -472,7 +494,9 @@ mod tests {
assert!(secrets.get(secret_name).await.unwrap().data.is_none());

// reconcile it
reconcile(Arc::new(secret.clone()), k8s).await.unwrap();
reconcile(Arc::new(secret.clone()), runo_config)
.await
.unwrap();

assert!(cronjobs
.get(build_cron_name(&Arc::new(secret), "0").as_str())
Expand Down
2 changes: 1 addition & 1 deletion src/secrets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ fn get_updated_secret(obj: &Arc<Secret>) -> Secret {
}
}

pub async fn update(obj: &Arc<Secret>, k8s: &Arc<K8s>) {
pub async fn update(obj: &Arc<Secret>, k8s: &K8s) {
let secrets: Api<Secret> =
Api::namespaced(K8s::get_client().await, obj.namespace().unwrap().as_str());
match secrets
Expand Down
12 changes: 12 additions & 0 deletions tests/test_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,15 @@ fn dry_run() {
.assert()
.interrupted();
}

#[test]
fn requeue_duration() {
let mut cmd = Command::cargo_bin("runo").unwrap();
cmd.arg("--requeue-duration")
.arg("10")
.arg("--http-port")
.arg("0")
.timeout(std::time::Duration::from_secs(1))
.assert()
.interrupted();
}
Loading