Skip to content

Commit

Permalink
fix: don't stop the reconciliation if one cluster is invalid (#575)
Browse files Browse the repository at this point in the history
  • Loading branch information
razvan authored Oct 23, 2024
1 parent 20898d0 commit 3807c82
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

- Implement `envOverrides` for HbaseCluster ([#550]).
- Omid test: use 1.1.2, update default port number and raise test timeout ([#556]).
- An invalid `HBaseCluster` doesn't cause the operator to stop functioning (#[575]).

### Removed

Expand Down
63 changes: 41 additions & 22 deletions rust/operator-binary/src/hbase_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use stackable_operator::{
apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
DeepMerge,
},
kube::core::{error_boundary, DeserializeGuard},
kube::{runtime::controller::Action, Resource},
kvp::{Label, LabelError, Labels, ObjectLabels},
logging::controller::ReconcilerError,
Expand Down Expand Up @@ -291,6 +292,11 @@ pub enum Error {

#[snafu(display("authorization is only supported from HBase 2.6 onwards"))]
AuthorizationNotSupported,

#[snafu(display("HBaseCluster object is invalid"))]
InvalidHBaseCluster {
source: error_boundary::InvalidObject,
},
}

type Result<T, E = Error> = std::result::Result<T, E>;
Expand All @@ -301,31 +307,39 @@ impl ReconcilerError for Error {
}
}

pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<Action> {
pub async fn reconcile_hbase(
hbase: Arc<DeserializeGuard<HbaseCluster>>,
ctx: Arc<Ctx>,
) -> Result<Action> {
tracing::info!("Starting reconcile");

let hbase = hbase
.0
.as_ref()
.map_err(error_boundary::InvalidObject::clone)
.context(InvalidHBaseClusterSnafu)?;

let client = &ctx.client;

validate_cr(&hbase)?;
validate_cr(hbase)?;

let resolved_product_image = hbase
.spec
.image
.resolve(DOCKER_IMAGE_BASE_NAME, crate::built_info::PKG_VERSION);
let zookeeper_connection_information = ZookeeperConnectionInformation::retrieve(&hbase, client)
let zookeeper_connection_information = ZookeeperConnectionInformation::retrieve(hbase, client)
.await
.context(RetrieveZookeeperConnectionInformationSnafu)?;

let vector_aggregator_address = resolve_vector_aggregator_address(&hbase, client)
let vector_aggregator_address = resolve_vector_aggregator_address(hbase, client)
.await
.context(ResolveVectorAggregatorAddressSnafu)?;

let roles = build_roles(&hbase)?;
let roles = build_roles(hbase)?;

let validated_config = validate_all_roles_and_groups_config(
&resolved_product_image.app_version_label,
&transform_all_roles_to_config(hbase.as_ref(), roles)
.context(GenerateProductConfigSnafu)?,
&transform_all_roles_to_config(hbase, roles).context(GenerateProductConfigSnafu)?,
&ctx.product_config,
false,
false,
Expand All @@ -334,7 +348,7 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<

let hbase_opa_config = match &hbase.spec.cluster_config.authorization {
Some(opa_config) => Some(
HbaseOpaConfig::from_opa_config(client, &hbase, opa_config)
HbaseOpaConfig::from_opa_config(client, hbase, opa_config)
.await
.context(InvalidOpaConfigSnafu)?,
),
Expand All @@ -351,15 +365,15 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
.context(CreateClusterResourcesSnafu)?;

let region_server_role_service =
build_region_server_role_service(&hbase, &resolved_product_image)?;
build_region_server_role_service(hbase, &resolved_product_image)?;
cluster_resources
.add(client, region_server_role_service)
.await
.context(ApplyRoleServiceSnafu)?;

// discovery config map
let discovery_cm = build_discovery_configmap(
&hbase,
hbase,
&zookeeper_connection_information,
&resolved_product_image,
)
Expand All @@ -370,7 +384,7 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
.context(ApplyDiscoveryConfigMapSnafu)?;

let (rbac_sa, rbac_rolebinding) = build_rbac_resources(
hbase.as_ref(),
hbase,
APP_NAME,
cluster_resources
.get_required_labels()
Expand Down Expand Up @@ -404,9 +418,9 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
.context(FailedToResolveConfigSnafu)?;

let rg_service =
build_rolegroup_service(&hbase, &hbase_role, &rolegroup, &resolved_product_image)?;
build_rolegroup_service(hbase, &hbase_role, &rolegroup, &resolved_product_image)?;
let rg_configmap = build_rolegroup_config_map(
&hbase,
hbase,
&rolegroup,
rolegroup_config,
&zookeeper_connection_information,
Expand All @@ -416,7 +430,7 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
vector_aggregator_address.as_deref(),
)?;
let rg_statefulset = build_rolegroup_statefulset(
&hbase,
hbase,
&hbase_role,
&rolegroup,
rolegroup_config,
Expand Down Expand Up @@ -450,7 +464,7 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
pod_disruption_budget: pdb,
}) = role_config
{
add_pdbs(pdb, &hbase, &hbase_role, client, &mut cluster_resources)
add_pdbs(pdb, hbase, &hbase_role, client, &mut cluster_resources)
.await
.context(FailedToCreatePdbSnafu)?;
}
Expand All @@ -460,18 +474,15 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
ClusterOperationsConditionBuilder::new(&hbase.spec.cluster_operation);

let status = HbaseClusterStatus {
conditions: compute_conditions(
hbase.as_ref(),
&[&ss_cond_builder, &cluster_operation_cond_builder],
),
conditions: compute_conditions(hbase, &[&ss_cond_builder, &cluster_operation_cond_builder]),
};

cluster_resources
.delete_orphaned_resources(client)
.await
.context(DeleteOrphanedResourcesSnafu)?;
client
.apply_patch_status(OPERATOR_NAME, hbase.as_ref(), &status)
.apply_patch_status(OPERATOR_NAME, hbase, &status)
.await
.context(ApplyStatusSnafu)?;

Expand Down Expand Up @@ -1071,8 +1082,16 @@ where
})
}

pub fn error_policy(_obj: Arc<HbaseCluster>, _error: &Error, _ctx: Arc<Ctx>) -> Action {
Action::requeue(*Duration::from_secs(5))
pub fn error_policy(
_obj: Arc<DeserializeGuard<HbaseCluster>>,
error: &Error,
_ctx: Arc<Ctx>,
) -> Action {
match error {
// root object is invalid, will be requed when modified
Error::InvalidHBaseCluster { .. } => Action::await_change(),
_ => Action::requeue(*Duration::from_secs(5)),
}
}

pub fn build_recommended_labels<'a>(
Expand Down
3 changes: 2 additions & 1 deletion rust/operator-binary/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use stackable_hbase_crd::{HbaseCluster, APP_NAME};
use stackable_operator::{
cli::{Command, ProductOperatorRun},
k8s_openapi::api::{apps::v1::StatefulSet, core::v1::Service},
kube::core::DeserializeGuard,
kube::runtime::{controller::Controller, watcher},
logging::controller::report_controller_reconciled,
CustomResourceExt,
Expand Down Expand Up @@ -66,7 +67,7 @@ async fn main() -> anyhow::Result<()> {
stackable_operator::client::create_client(Some(OPERATOR_NAME.to_string())).await?;

Controller::new(
watch_namespace.get_api::<HbaseCluster>(&client),
watch_namespace.get_api::<DeserializeGuard<HbaseCluster>>(&client),
watcher::Config::default(),
)
.owns(
Expand Down

0 comments on commit 3807c82

Please sign in to comment.