diff --git a/CHANGELOG.md b/CHANGELOG.md index d8e72d5..aa491c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,10 +16,13 @@ All notable changes to this project will be documented in this file. ### Fixed - BREAKING: The fields `connection` and `host` on `S3Connection` as well as `bucketName` on `S3Bucket`are now mandatory. Previously operators errored out in case these fields where missing ([#283]). +- Failing to parse one `ZookeeperCluster`/`ZookeeperZnode` should no longer cause the whole operator to stop functioning ([#293]). +- The StatefulSet restarter service now only retrieves metadata for ConfigMaps and Secrets, rather than full objects ([#293]). [#283]: https://github.com/stackabletech/commons-operator/pull/283 [#285]: https://github.com/stackabletech/commons-operator/pull/285 [#290]: https://github.com/stackabletech/commons-operator/pull/290 +[#293]: https://github.com/stackabletech/commons-operator/pull/293 ## [24.7.0] - 2024-07-24 diff --git a/rust/operator-binary/src/pod_enrichment_controller.rs b/rust/operator-binary/src/pod_enrichment_controller.rs index 2ecedf7..a84d81c 100644 --- a/rust/operator-binary/src/pod_enrichment_controller.rs +++ b/rust/operator-binary/src/pod_enrichment_controller.rs @@ -5,8 +5,9 @@ use snafu::{ResultExt, Snafu}; use stackable_operator::{ k8s_openapi::api::core::v1::{Node, Pod}, kube::{ - core::ObjectMeta, + core::{error_boundary, DeserializeGuard, ObjectMeta}, runtime::{controller, reflector::ObjectRef, watcher, Controller}, + Resource, }, logging::controller::{report_controller_reconciled, ReconcilerError}, namespace::WatchNamespace, @@ -23,11 +24,17 @@ struct Ctx { #[derive(Snafu, Debug, EnumDiscriminants)] #[strum_discriminants(derive(IntoStaticStr))] pub enum Error { + #[snafu(display("Pod object is invalid"))] + InvalidPod { + source: error_boundary::InvalidObject, + }, + #[snafu(display("failed to get {node} for Pod"))] GetNode { source: stackable_operator::client::Error, node: ObjectRef, }, + #[snafu(display("failed to update Pod"))] UpdatePod { source: stackable_operator::client::Error, @@ -41,6 +48,7 @@ impl ReconcilerError for Error { fn secondary_object(&self) -> Option> { match self { + Error::InvalidPod { source: _ } => None, Error::GetNode { node, .. } => Some(node.clone().erase()), Error::UpdatePod { source: _ } => None, } @@ -49,20 +57,23 @@ impl ReconcilerError for Error { pub async fn start(client: &stackable_operator::client::Client, watch_namespace: &WatchNamespace) { let controller = Controller::new( - watch_namespace.get_api::(client), + watch_namespace.get_api::>(client), watcher::Config::default().labels("enrichment.stackable.tech/enabled=true"), ); let pods = controller.store(); controller .watches( - client.get_all_api::(), + client.get_all_api::>(), watcher::Config::default(), move |node| { pods.state() .into_iter() .filter(move |pod| { + let Ok(pod) = &pod.0 else { + return false; + }; pod.spec.as_ref().and_then(|s| s.node_name.as_deref()) - == node.metadata.name.as_deref() + == node.meta().name.as_deref() }) .map(|pod| ObjectRef::from_obj(&*pod)) }, @@ -86,7 +97,17 @@ pub enum NodeAddressType { InternalIP, } -async fn reconcile(pod: Arc, ctx: Arc) -> Result { +async fn reconcile( + pod: Arc>, + ctx: Arc, +) -> Result { + tracing::info!("Starting reconcile"); + let pod = pod + .0 + .as_ref() + .map_err(error_boundary::InvalidObject::clone) + .context(InvalidPodSnafu)?; + let node_name = pod.spec.as_ref().and_then(|s| s.node_name.as_deref()); let node = if let Some(node_name) = node_name { ctx.client @@ -133,6 +154,15 @@ async fn reconcile(pod: Arc, ctx: Arc) -> Result, _error: &Error, _ctx: Arc) -> controller::Action { - controller::Action::requeue(Duration::from_secs(5)) +fn error_policy( + _obj: Arc>, + error: &Error, + _ctx: Arc, +) -> controller::Action { + match error { + // root object is invalid, will be requeued when modified anyway + Error::InvalidPod { .. } => controller::Action::await_change(), + + _ => controller::Action::requeue(Duration::from_secs(5)), + } } diff --git a/rust/operator-binary/src/restart_controller/pod.rs b/rust/operator-binary/src/restart_controller/pod.rs index e83dbae..bba406c 100644 --- a/rust/operator-binary/src/restart_controller/pod.rs +++ b/rust/operator-binary/src/restart_controller/pod.rs @@ -10,7 +10,7 @@ use stackable_operator::{ }, kube::{ self, - api::EvictParams, + api::{EvictParams, PartialObjectMeta}, core::DynamicObject, runtime::{controller::Action, reflector::ObjectRef, watcher, Controller}, }, @@ -63,7 +63,7 @@ impl ReconcilerError for Error { pub async fn start(client: &Client, watch_namespace: &WatchNamespace) { let controller = Controller::new( - watch_namespace.get_api::(client), + watch_namespace.get_api::>(client), watcher::Config::default(), ); controller @@ -80,7 +80,7 @@ pub async fn start(client: &Client, watch_namespace: &WatchNamespace) { .await; } -async fn reconcile(pod: Arc, ctx: Arc) -> Result { +async fn reconcile(pod: Arc>, ctx: Arc) -> Result { tracing::info!("Starting reconciliation .."); if pod.metadata.deletion_timestamp.is_some() { // Object is already being deleted, no point trying again @@ -163,6 +163,6 @@ async fn reconcile(pod: Arc, ctx: Arc) -> Result { } } -fn error_policy(_obj: Arc, _error: &Error, _ctx: Arc) -> Action { +fn error_policy(_obj: Arc>, _error: &Error, _ctx: Arc) -> Action { Action::requeue(Duration::from_secs(5)) } diff --git a/rust/operator-binary/src/restart_controller/statefulset.rs b/rust/operator-binary/src/restart_controller/statefulset.rs index caaa03d..358f9d1 100644 --- a/rust/operator-binary/src/restart_controller/statefulset.rs +++ b/rust/operator-binary/src/restart_controller/statefulset.rs @@ -12,13 +12,15 @@ use stackable_operator::k8s_openapi::api::core::v1::{ ConfigMap, EnvFromSource, EnvVar, PodSpec, Secret, Volume, }; use stackable_operator::kube; -use stackable_operator::kube::api::{Patch, PatchParams}; -use stackable_operator::kube::core::DynamicObject; +use stackable_operator::kube::api::{PartialObjectMeta, Patch, PatchParams}; +use stackable_operator::kube::core::{error_boundary, DeserializeGuard, DynamicObject}; use stackable_operator::kube::runtime::controller::{ trigger_self, trigger_with, Action, ReconcileRequest, }; use stackable_operator::kube::runtime::reflector::{ObjectRef, Store}; -use stackable_operator::kube::runtime::{applier, reflector, watcher, Config, WatchStreamExt}; +use stackable_operator::kube::runtime::{ + applier, metadata_watcher, reflector, watcher, Config, WatchStreamExt, +}; use stackable_operator::kube::{Resource, ResourceExt}; use stackable_operator::logging::controller::{report_controller_reconciled, ReconcilerError}; use stackable_operator::namespace::WatchNamespace; @@ -26,15 +28,20 @@ use strum::{EnumDiscriminants, IntoStaticStr}; struct Ctx { kube: kube::Client, - cms: Store, + cms: Store>, cms_inited: Arc, - secrets: Store, + secrets: Store>, secrets_inited: Arc, } #[derive(Snafu, Debug, EnumDiscriminants)] #[strum_discriminants(derive(IntoStaticStr))] enum Error { + #[snafu(display("StatefulSet object is invalid"))] + InvalidStatefulSet { + source: error_boundary::InvalidObject, + }, + #[snafu(display("failed to patch object {obj_ref}"))] PatchFailed { source: kube::Error, @@ -55,6 +62,7 @@ impl ReconcilerError for Error { fn secondary_object(&self) -> Option> { match self { + Error::InvalidStatefulSet { source: _ } => None, Error::PatchFailed { obj_ref, .. } => Some(*obj_ref.clone()), Error::ConfigMapsUninitialized => None, Error::SecretsUninitialized => None, @@ -63,12 +71,12 @@ impl ReconcilerError for Error { } pub async fn start(client: &Client, watch_namespace: &WatchNamespace) { - let stses = watch_namespace.get_api::(client); + let stses = watch_namespace.get_api::>(client); let cms = watch_namespace.get_api::(client); let secrets = watch_namespace.get_api::(client); - let sts_store = reflector::store::Writer::new(()); - let cm_store = reflector::store::Writer::new(()); - let secret_store = reflector::store::Writer::new(()); + let sts_store = reflector::store::Writer::>::new(()); + let cm_store = reflector::store::Writer::>::new(()); + let secret_store = reflector::store::Writer::>::new(()); let cms_inited = Arc::new(AtomicBool::from(false)); let secrets_inited = Arc::new(AtomicBool::from(false)); @@ -86,17 +94,18 @@ pub async fn start(client: &Client, watch_namespace: &WatchNamespace) { stream::select( stream::select( trigger_all( - reflector(cm_store, watcher(cms, watcher::Config::default())) + reflector(cm_store, metadata_watcher(cms, watcher::Config::default())) .inspect(|_| cms_inited.store(true, std::sync::atomic::Ordering::SeqCst)) .touched_objects(), sts_store.as_reader(), ), trigger_all( - reflector(secret_store, watcher(secrets, watcher::Config::default())) - .inspect(|_| { - secrets_inited.store(true, std::sync::atomic::Ordering::SeqCst) - }) - .touched_objects(), + reflector( + secret_store, + metadata_watcher(secrets, watcher::Config::default()), + ) + .inspect(|_| secrets_inited.store(true, std::sync::atomic::Ordering::SeqCst)) + .touched_objects(), sts_store.as_reader(), ), ), @@ -161,7 +170,17 @@ fn find_pod_refs<'a, K: Resource + 'a>( .chain(container_env_from_refs) } -async fn reconcile(sts: Arc, ctx: Arc) -> Result { +async fn reconcile( + sts: Arc>, + ctx: Arc, +) -> Result { + tracing::info!("Starting reconcile"); + let sts = sts + .0 + .as_ref() + .map_err(error_boundary::InvalidObject::clone) + .context(InvalidStatefulSetSnafu)?; + if !ctx.cms_inited.load(std::sync::atomic::Ordering::SeqCst) { return ConfigMapsUninitializedSnafu.fail(); } @@ -181,12 +200,12 @@ async fn reconcile(sts: Arc, ctx: Arc) -> Result::new( + Some(ObjectRef::>::new( &volume.config_map.as_ref()?.name, )) }, |env_var| { - Some(ObjectRef::::new( + Some(ObjectRef::>::new( &env_var .value_from .as_ref()? @@ -196,7 +215,7 @@ async fn reconcile(sts: Arc, ctx: Arc) -> Result::new( + Some(ObjectRef::>::new( &env_from.config_map_ref.as_ref()?.name, )) }, @@ -225,17 +244,17 @@ async fn reconcile(sts: Arc, ctx: Arc) -> Result::new( + Some(ObjectRef::>::new( volume.secret.as_ref()?.secret_name.as_deref()?, )) }, |env_var| { - Some(ObjectRef::::new( + Some(ObjectRef::>::new( &env_var.value_from.as_ref()?.secret_key_ref.as_ref()?.name, )) }, |env_from| { - Some(ObjectRef::::new( + Some(ObjectRef::>::new( &env_from.secret_ref.as_ref()?.name, )) }, @@ -290,11 +309,16 @@ async fn reconcile(sts: Arc, ctx: Arc) -> Result, _error: &Error, _ctx: Arc) -> Action { - Action::requeue(Duration::from_secs(5)) +fn error_policy(_obj: Arc>, error: &Error, _ctx: Arc) -> Action { + match error { + // root object is invalid, will be requeued when modified anyway + Error::InvalidStatefulSet { .. } => Action::await_change(), + + _ => Action::requeue(Duration::from_secs(5)), + } }