Skip to content

Commit

Permalink
Migrate to DeserializeGuard (#293)
Browse files Browse the repository at this point in the history
* Migrate to DeserializeGuard

Some cases have been migrated to PartialObjectMeta instead where only
the metadata is used anyway.

Part of stackabletech/issues#211. Fixes #237.

* Changelog

* Avoid requesting CM/secret data entirely

Instead of requesting it but throwing it away.
  • Loading branch information
nightkr authored Oct 24, 2024
1 parent c379651 commit ee83f52
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 36 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ All notable changes to this project will be documented in this file.
### Fixed

- BREAKING: The fields `connection` and `host` on `S3Connection` as well as `bucketName` on `S3Bucket`are now mandatory. Previously operators errored out in case these fields where missing ([#283]).
- Failing to parse one `ZookeeperCluster`/`ZookeeperZnode` should no longer cause the whole operator to stop functioning ([#293]).
- The StatefulSet restarter service now only retrieves metadata for ConfigMaps and Secrets, rather than full objects ([#293]).

[#283]: https://github.com/stackabletech/commons-operator/pull/283
[#285]: https://github.com/stackabletech/commons-operator/pull/285
[#290]: https://github.com/stackabletech/commons-operator/pull/290
[#293]: https://github.com/stackabletech/commons-operator/pull/293

## [24.7.0] - 2024-07-24

Expand Down
44 changes: 37 additions & 7 deletions rust/operator-binary/src/pod_enrichment_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use snafu::{ResultExt, Snafu};
use stackable_operator::{
k8s_openapi::api::core::v1::{Node, Pod},
kube::{
core::ObjectMeta,
core::{error_boundary, DeserializeGuard, ObjectMeta},
runtime::{controller, reflector::ObjectRef, watcher, Controller},
Resource,
},
logging::controller::{report_controller_reconciled, ReconcilerError},
namespace::WatchNamespace,
Expand All @@ -23,11 +24,17 @@ struct Ctx {
#[derive(Snafu, Debug, EnumDiscriminants)]
#[strum_discriminants(derive(IntoStaticStr))]
pub enum Error {
#[snafu(display("Pod object is invalid"))]
InvalidPod {
source: error_boundary::InvalidObject,
},

#[snafu(display("failed to get {node} for Pod"))]
GetNode {
source: stackable_operator::client::Error,
node: ObjectRef<Node>,
},

#[snafu(display("failed to update Pod"))]
UpdatePod {
source: stackable_operator::client::Error,
Expand All @@ -41,6 +48,7 @@ impl ReconcilerError for Error {

fn secondary_object(&self) -> Option<ObjectRef<stackable_operator::kube::core::DynamicObject>> {
match self {
Error::InvalidPod { source: _ } => None,
Error::GetNode { node, .. } => Some(node.clone().erase()),
Error::UpdatePod { source: _ } => None,
}
Expand All @@ -49,20 +57,23 @@ impl ReconcilerError for Error {

pub async fn start(client: &stackable_operator::client::Client, watch_namespace: &WatchNamespace) {
let controller = Controller::new(
watch_namespace.get_api::<Pod>(client),
watch_namespace.get_api::<DeserializeGuard<Pod>>(client),
watcher::Config::default().labels("enrichment.stackable.tech/enabled=true"),
);
let pods = controller.store();
controller
.watches(
client.get_all_api::<Node>(),
client.get_all_api::<DeserializeGuard<Node>>(),
watcher::Config::default(),
move |node| {
pods.state()
.into_iter()
.filter(move |pod| {
let Ok(pod) = &pod.0 else {
return false;
};
pod.spec.as_ref().and_then(|s| s.node_name.as_deref())
== node.metadata.name.as_deref()
== node.meta().name.as_deref()
})
.map(|pod| ObjectRef::from_obj(&*pod))
},
Expand All @@ -86,7 +97,17 @@ pub enum NodeAddressType {
InternalIP,
}

async fn reconcile(pod: Arc<Pod>, ctx: Arc<Ctx>) -> Result<controller::Action, Error> {
async fn reconcile(
pod: Arc<DeserializeGuard<Pod>>,
ctx: Arc<Ctx>,
) -> Result<controller::Action, Error> {
tracing::info!("Starting reconcile");
let pod = pod
.0
.as_ref()
.map_err(error_boundary::InvalidObject::clone)
.context(InvalidPodSnafu)?;

let node_name = pod.spec.as_ref().and_then(|s| s.node_name.as_deref());
let node = if let Some(node_name) = node_name {
ctx.client
Expand Down Expand Up @@ -133,6 +154,15 @@ async fn reconcile(pod: Arc<Pod>, ctx: Arc<Ctx>) -> Result<controller::Action, E
Ok(controller::Action::await_change())
}

fn error_policy(_obj: Arc<Pod>, _error: &Error, _ctx: Arc<Ctx>) -> controller::Action {
controller::Action::requeue(Duration::from_secs(5))
fn error_policy(
_obj: Arc<DeserializeGuard<Pod>>,
error: &Error,
_ctx: Arc<Ctx>,
) -> controller::Action {
match error {
// root object is invalid, will be requeued when modified anyway
Error::InvalidPod { .. } => controller::Action::await_change(),

_ => controller::Action::requeue(Duration::from_secs(5)),
}
}
8 changes: 4 additions & 4 deletions rust/operator-binary/src/restart_controller/pod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use stackable_operator::{
},
kube::{
self,
api::EvictParams,
api::{EvictParams, PartialObjectMeta},
core::DynamicObject,
runtime::{controller::Action, reflector::ObjectRef, watcher, Controller},
},
Expand Down Expand Up @@ -63,7 +63,7 @@ impl ReconcilerError for Error {

pub async fn start(client: &Client, watch_namespace: &WatchNamespace) {
let controller = Controller::new(
watch_namespace.get_api::<Pod>(client),
watch_namespace.get_api::<PartialObjectMeta<Pod>>(client),
watcher::Config::default(),
);
controller
Expand All @@ -80,7 +80,7 @@ pub async fn start(client: &Client, watch_namespace: &WatchNamespace) {
.await;
}

async fn reconcile(pod: Arc<Pod>, ctx: Arc<Ctx>) -> Result<Action, Error> {
async fn reconcile(pod: Arc<PartialObjectMeta<Pod>>, ctx: Arc<Ctx>) -> Result<Action, Error> {
tracing::info!("Starting reconciliation ..");
if pod.metadata.deletion_timestamp.is_some() {
// Object is already being deleted, no point trying again
Expand Down Expand Up @@ -163,6 +163,6 @@ async fn reconcile(pod: Arc<Pod>, ctx: Arc<Ctx>) -> Result<Action, Error> {
}
}

fn error_policy(_obj: Arc<Pod>, _error: &Error, _ctx: Arc<Ctx>) -> Action {
fn error_policy(_obj: Arc<PartialObjectMeta<Pod>>, _error: &Error, _ctx: Arc<Ctx>) -> Action {
Action::requeue(Duration::from_secs(5))
}
74 changes: 49 additions & 25 deletions rust/operator-binary/src/restart_controller/statefulset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,36 @@ use stackable_operator::k8s_openapi::api::core::v1::{
ConfigMap, EnvFromSource, EnvVar, PodSpec, Secret, Volume,
};
use stackable_operator::kube;
use stackable_operator::kube::api::{Patch, PatchParams};
use stackable_operator::kube::core::DynamicObject;
use stackable_operator::kube::api::{PartialObjectMeta, Patch, PatchParams};
use stackable_operator::kube::core::{error_boundary, DeserializeGuard, DynamicObject};
use stackable_operator::kube::runtime::controller::{
trigger_self, trigger_with, Action, ReconcileRequest,
};
use stackable_operator::kube::runtime::reflector::{ObjectRef, Store};
use stackable_operator::kube::runtime::{applier, reflector, watcher, Config, WatchStreamExt};
use stackable_operator::kube::runtime::{
applier, metadata_watcher, reflector, watcher, Config, WatchStreamExt,
};
use stackable_operator::kube::{Resource, ResourceExt};
use stackable_operator::logging::controller::{report_controller_reconciled, ReconcilerError};
use stackable_operator::namespace::WatchNamespace;
use strum::{EnumDiscriminants, IntoStaticStr};

struct Ctx {
kube: kube::Client,
cms: Store<ConfigMap>,
cms: Store<PartialObjectMeta<ConfigMap>>,
cms_inited: Arc<AtomicBool>,
secrets: Store<Secret>,
secrets: Store<PartialObjectMeta<Secret>>,
secrets_inited: Arc<AtomicBool>,
}

#[derive(Snafu, Debug, EnumDiscriminants)]
#[strum_discriminants(derive(IntoStaticStr))]
enum Error {
#[snafu(display("StatefulSet object is invalid"))]
InvalidStatefulSet {
source: error_boundary::InvalidObject,
},

#[snafu(display("failed to patch object {obj_ref}"))]
PatchFailed {
source: kube::Error,
Expand All @@ -55,6 +62,7 @@ impl ReconcilerError for Error {

fn secondary_object(&self) -> Option<ObjectRef<DynamicObject>> {
match self {
Error::InvalidStatefulSet { source: _ } => None,
Error::PatchFailed { obj_ref, .. } => Some(*obj_ref.clone()),
Error::ConfigMapsUninitialized => None,
Error::SecretsUninitialized => None,
Expand All @@ -63,12 +71,12 @@ impl ReconcilerError for Error {
}

pub async fn start(client: &Client, watch_namespace: &WatchNamespace) {
let stses = watch_namespace.get_api::<StatefulSet>(client);
let stses = watch_namespace.get_api::<DeserializeGuard<StatefulSet>>(client);
let cms = watch_namespace.get_api::<ConfigMap>(client);
let secrets = watch_namespace.get_api::<Secret>(client);
let sts_store = reflector::store::Writer::new(());
let cm_store = reflector::store::Writer::new(());
let secret_store = reflector::store::Writer::new(());
let sts_store = reflector::store::Writer::<DeserializeGuard<StatefulSet>>::new(());
let cm_store = reflector::store::Writer::<PartialObjectMeta<ConfigMap>>::new(());
let secret_store = reflector::store::Writer::<PartialObjectMeta<Secret>>::new(());
let cms_inited = Arc::new(AtomicBool::from(false));
let secrets_inited = Arc::new(AtomicBool::from(false));

Expand All @@ -86,17 +94,18 @@ pub async fn start(client: &Client, watch_namespace: &WatchNamespace) {
stream::select(
stream::select(
trigger_all(
reflector(cm_store, watcher(cms, watcher::Config::default()))
reflector(cm_store, metadata_watcher(cms, watcher::Config::default()))
.inspect(|_| cms_inited.store(true, std::sync::atomic::Ordering::SeqCst))
.touched_objects(),
sts_store.as_reader(),
),
trigger_all(
reflector(secret_store, watcher(secrets, watcher::Config::default()))
.inspect(|_| {
secrets_inited.store(true, std::sync::atomic::Ordering::SeqCst)
})
.touched_objects(),
reflector(
secret_store,
metadata_watcher(secrets, watcher::Config::default()),
)
.inspect(|_| secrets_inited.store(true, std::sync::atomic::Ordering::SeqCst))
.touched_objects(),
sts_store.as_reader(),
),
),
Expand Down Expand Up @@ -161,7 +170,17 @@ fn find_pod_refs<'a, K: Resource + 'a>(
.chain(container_env_from_refs)
}

async fn reconcile(sts: Arc<StatefulSet>, ctx: Arc<Ctx>) -> Result<Action, Error> {
async fn reconcile(
sts: Arc<DeserializeGuard<StatefulSet>>,
ctx: Arc<Ctx>,
) -> Result<Action, Error> {
tracing::info!("Starting reconcile");
let sts = sts
.0
.as_ref()
.map_err(error_boundary::InvalidObject::clone)
.context(InvalidStatefulSetSnafu)?;

if !ctx.cms_inited.load(std::sync::atomic::Ordering::SeqCst) {
return ConfigMapsUninitializedSnafu.fail();
}
Expand All @@ -181,12 +200,12 @@ async fn reconcile(sts: Arc<StatefulSet>, ctx: Arc<Ctx>) -> Result<Action, Error
find_pod_refs(
pod_spec,
|volume| {
Some(ObjectRef::<ConfigMap>::new(
Some(ObjectRef::<PartialObjectMeta<ConfigMap>>::new(
&volume.config_map.as_ref()?.name,
))
},
|env_var| {
Some(ObjectRef::<ConfigMap>::new(
Some(ObjectRef::<PartialObjectMeta<ConfigMap>>::new(
&env_var
.value_from
.as_ref()?
Expand All @@ -196,7 +215,7 @@ async fn reconcile(sts: Arc<StatefulSet>, ctx: Arc<Ctx>) -> Result<Action, Error
))
},
|env_from| {
Some(ObjectRef::<ConfigMap>::new(
Some(ObjectRef::<PartialObjectMeta<ConfigMap>>::new(
&env_from.config_map_ref.as_ref()?.name,
))
},
Expand Down Expand Up @@ -225,17 +244,17 @@ async fn reconcile(sts: Arc<StatefulSet>, ctx: Arc<Ctx>) -> Result<Action, Error
find_pod_refs(
pod_spec,
|volume| {
Some(ObjectRef::<Secret>::new(
Some(ObjectRef::<PartialObjectMeta<Secret>>::new(
volume.secret.as_ref()?.secret_name.as_deref()?,
))
},
|env_var| {
Some(ObjectRef::<Secret>::new(
Some(ObjectRef::<PartialObjectMeta<Secret>>::new(
&env_var.value_from.as_ref()?.secret_key_ref.as_ref()?.name,
))
},
|env_from| {
Some(ObjectRef::<Secret>::new(
Some(ObjectRef::<PartialObjectMeta<Secret>>::new(
&env_from.secret_ref.as_ref()?.name,
))
},
Expand Down Expand Up @@ -290,11 +309,16 @@ async fn reconcile(sts: Arc<StatefulSet>, ctx: Arc<Ctx>) -> Result<Action, Error
)
.await
.context(PatchFailedSnafu {
obj_ref: ObjectRef::from_obj(sts.as_ref()).erase(),
obj_ref: ObjectRef::from_obj(sts).erase(),
})?;
Ok(Action::await_change())
}

fn error_policy(_obj: Arc<StatefulSet>, _error: &Error, _ctx: Arc<Ctx>) -> Action {
Action::requeue(Duration::from_secs(5))
fn error_policy(_obj: Arc<DeserializeGuard<StatefulSet>>, error: &Error, _ctx: Arc<Ctx>) -> Action {
match error {
// root object is invalid, will be requeued when modified anyway
Error::InvalidStatefulSet { .. } => Action::await_change(),

_ => Action::requeue(Duration::from_secs(5)),
}
}

0 comments on commit ee83f52

Please sign in to comment.