Skip to content

Commit

Permalink
Fix 0.2.9
Browse files Browse the repository at this point in the history
  • Loading branch information
sebt3 committed May 7, 2024
1 parent bc36dc3 commit d4a59e3
Show file tree
Hide file tree
Showing 17 changed files with 242 additions and 101 deletions.
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions deploy/crd/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ spec:
jsonPath: .spec.url
name: url
type: string
- description: Git branch
jsonPath: .spec.branch
name: branch
type: string
- description: Update schedule
jsonPath: .spec.schedule
name: schedule
Expand Down
2 changes: 1 addition & 1 deletion k8s/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ k8s-openapi = { version = "0.21.0", features = ["latest"], default-features = fa
chrono = "0.4.24"
anyhow = "1.0.70"
sha256 = "1.1.2"
log = "0.4.17"
tracing = "0.1.37"

[dependencies.kube]
features = ["runtime", "client", "derive"]
Expand Down
1 change: 1 addition & 0 deletions k8s/src/distrib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl DistribComponent {
#[kube(kind = "Distrib", group = "vynil.solidite.fr", version = "v1")]
#[kube(status = "DistribStatus", shortname = "dist", printcolumn = r#"
{"name":"url", "type":"string", "description":"Git url", "jsonPath":".spec.url"},
{"name":"branch", "type":"string", "description":"Git branch", "jsonPath":".spec.branch"},
{"name":"schedule", "type":"string", "description":"Update schedule", "jsonPath":".spec.schedule"},
{"name":"last_updated", "type":"string", "description":"Last update date", "format": "date-time", "jsonPath":".status.last_updated"}"#)]
pub struct DistribSpec {
Expand Down
45 changes: 35 additions & 10 deletions k8s/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ pub struct InstallHandler {
api: Api<Install>,
}
impl InstallHandler {
#[must_use] pub fn new(cl: Client, ns: &str) -> InstallHandler {
#[must_use] pub fn new(cl: &Client, ns: &str) -> InstallHandler {
InstallHandler {
api: Api::namespaced(cl, ns),
api: Api::namespaced(cl.clone(), ns),
}
}
pub async fn have(&mut self, name: &str) -> bool {
Expand All @@ -33,9 +33,9 @@ pub struct DistribHandler {
api: Api<Distrib>,
}
impl DistribHandler {
#[must_use] pub fn new(cl: Client) -> DistribHandler {
#[must_use] pub fn new(cl: &Client) -> DistribHandler {
DistribHandler {
api: Api::all(cl),
api: Api::all(cl.clone()),
}
}
pub async fn have(&mut self, name: &str) -> bool {
Expand All @@ -53,14 +53,14 @@ impl DistribHandler {
}
}

use k8s_openapi::api::networking::v1::Ingress;
pub use k8s_openapi::api::networking::v1::Ingress;
pub struct IngressHandler {
api: Api<Ingress>,
}
impl IngressHandler {
#[must_use] pub fn new(cl: Client, ns: &str) -> IngressHandler {
#[must_use] pub fn new(cl: &Client, ns: &str) -> IngressHandler {
IngressHandler {
api: Api::namespaced(cl, ns),
api: Api::namespaced(cl.clone(), ns),
}
}
pub async fn have(&mut self, name: &str) -> bool {
Expand All @@ -78,14 +78,14 @@ impl IngressHandler {
}
}

use k8s_openapi::api::core::v1::Secret;
pub use k8s_openapi::api::core::v1::Secret;
pub struct SecretHandler {
api: Api<Secret>,
}
impl SecretHandler {
#[must_use] pub fn new(cl: Client, ns: &str) -> SecretHandler {
#[must_use] pub fn new(cl: &Client, ns: &str) -> SecretHandler {
SecretHandler {
api: Api::namespaced(cl, ns),
api: Api::namespaced(cl.clone(), ns),
}
}
pub async fn have(&mut self, name: &str) -> bool {
Expand All @@ -102,3 +102,28 @@ impl SecretHandler {
self.api.get(name).await
}
}

pub use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
pub struct CustomResourceDefinitionHandler {
api: Api<CustomResourceDefinition>,
}
impl CustomResourceDefinitionHandler {
#[must_use] pub fn new(cl: &Client) -> CustomResourceDefinitionHandler {
CustomResourceDefinitionHandler {
api: Api::all(cl.clone()),
}
}
pub async fn have(&mut self, name: &str) -> bool {
let lp = ListParams::default();
let list = self.api.list(&lp).await.unwrap();
for secret in list {
if secret.metadata.name.clone().unwrap() == name {
return true;
}
}
false
}
pub async fn get(&mut self, name: &str) -> Result<CustomResourceDefinition, kube::Error> {
self.api.get(name).await
}
}
4 changes: 4 additions & 0 deletions k8s/src/install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub const STATUS_DESTROYED: &str = "destroyed";
pub const STATUS_AGENT_STARTED: &str = "agent started";
pub const STATUS_MISSING_DIST: &str = "missing distribution";
pub const STATUS_MISSING_COMP: &str = "missing component";
pub const STATUS_CHECK_FAIL: &str = "Validations failed";
pub const STATUS_MISSING_PROV: &str = "missing provider config";
pub const STATUS_MISSING_DEPS: &str = "missing dependencies";
pub const STATUS_WAITING_DEPS: &str = "waiting dependencies";
Expand Down Expand Up @@ -181,6 +182,9 @@ impl Install {
pub async fn update_status_missing_component(&self, client: Client, manager: &str, errors: Vec<String>) -> Result<Install, kube::Error> {
self.update_status_typed(client, manager, errors, STATUS_MISSING_COMP).await
}
pub async fn update_status_check_failed(&self, client: Client, manager: &str, errors: Vec<String>) -> Result<Install, kube::Error> {
self.update_status_typed(client, manager, errors, STATUS_CHECK_FAIL).await
}
pub async fn update_status_missing_provider(&self, client: Client, manager: &str, errors: Vec<String>) -> Result<Install, kube::Error> {
self.update_status_typed(client, manager, errors, STATUS_MISSING_PROV).await
}
Expand Down
6 changes: 3 additions & 3 deletions k8s/src/yaml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,15 +274,15 @@ impl Component {
}
}
if skip {
log::warn!("Skipping option \"{}\" while updating type structure from default values", key);
log::info!("you should set \"type: array\" and a correct \"items\" definition for option \"{}\" so later validation will work", key);
tracing::warn!("Skipping option \"{}\" while updating type structure from default values", key);
tracing::info!("you should set \"type: array\" and a correct \"items\" definition for option \"{}\" so later validation will work", key);
} else if let Some(opts) = schema.schema_data.default.as_ref() {
// That option have a default value, update its properties
let final_schema = &schema_for_value!(opts).schema;
let objdef = serde_json::from_str(serde_json::to_string(final_schema)?.as_str())?;
merge_json( &mut val, objdef);
add_defaults(&mut val);
log::debug!("{key} after default : {:}", serde_yaml::to_string(&val).unwrap());
tracing::debug!("{key} after default : {:}", serde_yaml::to_string(&val).unwrap());
*self.options.get_mut(key.as_str()).unwrap() = val;
}
}
Expand Down
1 change: 1 addition & 0 deletions operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ async-trait = "0.1.68"
either = "1.8.1"
base64 = "0.21.2"
json-patch = {version = "=1.2.0"}
rhai = { version = "1.12.0", features = ["sync", "serde"] }

[dependencies.kube]
features = ["runtime", "client", "derive"]
Expand Down
3 changes: 1 addition & 2 deletions operator/src/distrib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,8 @@ impl Reconciler for Distrib {
}
}


#[must_use] pub fn error_policy(dist: Arc<Distrib>, error: &Error, ctx: Arc<Context>) -> Action {
warn!("reconcile failed: {:?}", error);
warn!("reconcile failed for {:?}: {:?}", dist.metadata.name, error);
ctx.metrics.dist_reconcile_failure(&dist, error);
Action::requeue(Duration::from_secs(5 * 60))
}
10 changes: 10 additions & 0 deletions operator/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ use k8s_openapi::api::core::v1::ObjectReference;
}
}

#[must_use] pub fn from_check(src_type: &str, src_name: &String, action: String, note: Option<String>) -> Event {
Event {
type_: EventType::Normal,
reason: format!("Reconciling `{}` {}", src_name, src_type),
note,
action,
secondary: None,
}
}

#[must_use] pub fn from_update(src_type: &str, src_name: &String, child_type: &str, child_name: &String, child: Option<ObjectReference>) -> Event {
Event {
type_: EventType::Normal,
Expand Down
63 changes: 42 additions & 21 deletions operator/src/install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use kube::{
//use base64::{Engine as _, engine::general_purpose};
use std::sync::Arc;
use tokio::time::Duration;
use tracing::{Span, debug, field, info, instrument, warn};
use tracing::{Span, debug, field, info, instrument, warn, error};
use async_trait::async_trait;
pub use k8s::install::{Install,InstallStatus, STATUS_INSTALLED};
pub use k8s::distrib::{Distrib, ComponentDependency};
Expand Down Expand Up @@ -64,6 +64,10 @@ impl Reconciler for Install {
let bootstrap = jobs.get("vynil-bootstrap").await.unwrap();
if let Some(status) = bootstrap.status {
if status.completion_time.is_none() {
warn!("Will not trigger auto-install before the bootstrap job is completed, requeue");
recorder.publish(
events::from_check("Install", &name, "Bootstrap in progress, requeue".to_string(), None)
).await.map_err(Error::KubeError)?;
return Ok(Action::requeue(Duration::from_secs(60)))
}
}
Expand All @@ -72,6 +76,10 @@ impl Reconciler for Install {
// Validate that the requested package exist in that distrib
if ! dist.have_component(self.spec.category.as_str(), self.spec.component.as_str()) {
self.update_status_missing_component(client, OPERATOR, vec!(format!("{:} - {:} is not known from {:?} distribution", self.spec.category.as_str(), self.spec.component.as_str(), dist_name))).await.map_err(Error::KubeError)?;
warn!("Missing component for {ns}.{name}");
recorder.publish(
events::from_check("Install", &name, "Missing component".to_string(), Some(format!("{:} - {:} is not known from {:?} distribution", self.spec.category.as_str(), self.spec.component.as_str(), dist_name)))
).await.map_err(Error::KubeError)?;
if dist.status.is_some() {
return Err(Error::IllegalInstall)
} else { // the dist is not yet updated, wait for it for 60s
Expand Down Expand Up @@ -144,6 +152,11 @@ impl Reconciler for Install {
}
}
if ! missing.is_empty() {
let note = missing[0].clone();
warn!("Missing dependencies for {ns}.{name}: {note}");
recorder.publish(
events::from_check("Install", &name, "Missing dependencies".to_string(), Some(note))
).await.map_err(Error::KubeError)?;
self.update_status_missing_component(client, OPERATOR, missing).await.map_err(Error::KubeError)?;
if should_fail {
return Err(Error::IllegalInstall)
Expand All @@ -153,7 +166,6 @@ impl Reconciler for Install {
}
// Use provided check script
if comp.check.is_some() {
info!("Starting the check script");
let check = comp.check.clone().unwrap();
let mut script = script::Script::from_str(&check, script::new_base_context(
self.spec.category.clone(),
Expand All @@ -163,16 +175,25 @@ impl Reconciler for Install {
));
let stage = "check".to_string();
let errors = match script.run_pre_stage(&stage) {
Ok(d) => Vec::new(),
Ok(_d) => Vec::new(),
Err(e) => {
let mut missing: Vec<String> = Vec::new();
missing.push(format!("{e}",e));
missing.push(format!("{e}"));
missing
}
};
if ! errors.is_empty() {
self.update_status_missing_component(client, OPERATOR, errors).await.map_err(Error::KubeError)?;
let note = errors[0].clone();
warn!("Validation script failed for {ns}.{name}: {note}");
recorder.publish(
events::from_check("Install", &name, "Validation failed".to_string(), Some(note))
).await.map_err(Error::KubeError)?;
self.update_status_check_failed(client, OPERATOR, errors).await.map_err(Error::KubeError)?;
return Ok(Action::requeue(Duration::from_secs(60)))
} else {
recorder.publish(
events::from_check("Install", &name, "Validation succeed".to_string(), None)
).await.map_err(Error::KubeError)?;
}
}
let hashedself = crate::jobs::HashedSelf::new(ns.as_str(), name.as_str(), self.options_digest().as_str(), self.spec.distrib.as_str(), &comp.commit_id);
Expand All @@ -191,32 +212,32 @@ impl Reconciler for Install {
info!("Creating {agent_name} Job");
self.update_status_agent_started(client, OPERATOR).await.map_err(Error::KubeError)?;
let job = jobs.create_install(agent_name.as_str(), &agent_job, action, name.as_str(), ns.as_str()).await.unwrap();
debug!("Sending event {agent_name} Job");
recorder.publish(
events::from_create("Install", &name, "Job", &job.name_any(), Some(job.object_ref(&())))
).await.map_err(Error::KubeError)?;
debug!("Waiting {agent_name} to finish Job");
jobs.wait_max(agent_name.as_str(),2*60).await.map_err(Error::WaitError)?.map_err(Error::JobError)?;
debug!("Waited {agent_name} OK");
} else {
info!("Patching {agent_name} Job");
let _job = match jobs.apply_install(agent_name.as_str(), &agent_job, action, name.as_str(), ns.as_str()).await {Ok(j)=>j,Err(_e)=>{
let job = match jobs.apply_install(agent_name.as_str(), &agent_job, action, name.as_str(), ns.as_str()).await {Ok(j)=>j,Err(_e)=>{
let job = jobs.get(agent_name.as_str()).await.unwrap();
recorder.publish(
events::from_delete("plan", &name, "Job", &job.name_any(), Some(job.object_ref(&())))
events::from_delete("Install", &name, "Job", &job.name_any(), Some(job.object_ref(&())))
).await.map_err(Error::KubeError)?;
jobs.delete(agent_name.as_str()).await.unwrap();
error!("Recreating {agent_name} Job");
recorder.publish(
events::from_create("Install", &name, "Job", &job.name_any(), Some(job.object_ref(&())))
).await.map_err(Error::KubeError)?;
jobs.create_install(agent_name.as_str(), &agent_job, action, name.as_str(), ns.as_str()).await.unwrap()
}};
// TODO: Detect if the job changed after the patch (or event better would change prior)
// TODO: Send a patched event if changed
/*debug!("Sending event for {plan_name} to finish Job");
recorder.publish(
events::from_patch("Install", &name, "Job", &_job.name_any(), Some(_job.object_ref(&())))
).await.map_err(Error::KubeError)?;*/
debug!("Waiting {agent_name} to finish Job");
jobs.wait_max(agent_name.as_str(),2*60).await.map_err(Error::WaitError)?.map_err(Error::JobError)?;
debug!("Waited {agent_name} OK");
if let Some(status) = job.status {
if status.completion_time.is_none() {
error!("Waiting after {agent_name} Job");
recorder.publish(
events::from_check("Install", &name, "Bootstrap in progress, requeue".to_string(), None)
).await.map_err(Error::KubeError)?;
jobs.wait_max(agent_name.as_str(),2*60).await.map_err(Error::WaitError)?.map_err(Error::JobError)?;
}
}
}
Ok(Action::requeue(Duration::from_secs(5 * 60)))
}
Expand Down Expand Up @@ -295,7 +316,7 @@ impl Reconciler for Install {
}

#[must_use] pub fn error_policy(inst: Arc<Install>, error: &Error, ctx: Arc<Context>) -> Action {
warn!("reconcile failed: {:?}", error);
warn!("reconcile failed for '{:?}.{:?}': {:?}", inst.metadata.namespace, inst.metadata.name, error);
ctx.metrics.inst_reconcile_failure(&inst, error);
Action::requeue(Duration::from_secs(5 * 60))
}
2 changes: 1 addition & 1 deletion operator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub enum Error {
KubeError(#[source] kube::Error),

#[error("check Error: {0}")]
CheckError(#[source] k8s::Error),
CheckError(#[source] package::Error),

#[error("Kube wait job Error: {0}")]
WaitError(#[source] tokio::time::error::Elapsed),
Expand Down
5 changes: 3 additions & 2 deletions package/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ license = " BSD-3-Clause"
[dependencies]
k8s = { path = "../k8s" }
anyhow = "1.0.69"
rhai = { version = "1.12.0", features = ["serde"] }
rhai = { version = "1.12.0", features = ["sync", "serde"] }
serde = { version = "1.0.152", features = ["derive"] }
log = "0.4.17"
tracing = "0.1.37"
regex = "1.7.3"
futures = "0.3.25"
indexmap = "1.9.3"
serde_json = "1.0.95"
handlebars = "4.3.6"
schemars = { version = "0.8.11", features = ["chrono"] }
tokio = { version = "1.23.0", features = ["macros", "rt-multi-thread"] }

[lib]
name = "package"
Expand Down
Loading

0 comments on commit d4a59e3

Please sign in to comment.