Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Avoid unnecessary k8s::Client creations #295

Merged
merged 5 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,8 @@ impl ResourceRequests {
/// Validates the struct [`ResourceRequests`] by comparing the required
/// resources to the available ones in the current cluster. `object_name`
/// should be `stack` or `demo`.
pub async fn validate_cluster_size(&self, object_name: &str) -> Result<()> {
let kube_client = Client::new().await.context(KubeClientCreateSnafu)?;
let cluster_info = kube_client
.get_cluster_info()
.await
.context(ClusterInfoSnafu)?;
pub async fn validate_cluster_size(&self, client: &Client, object_name: &str) -> Result<()> {
let cluster_info = client.get_cluster_info().await.context(ClusterInfoSnafu)?;

let stack_cpu =
CpuQuantity::try_from(&self.cpu).context(ParseCpuResourceRequirementsSnafu)?;
Expand Down
8 changes: 4 additions & 4 deletions rust/stackable-cockpit/src/platform/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use kube::{core::DynamicObject, ResourceExt};
use serde::Serialize;
use snafu::{OptionExt, ResultExt, Snafu};

use crate::utils::k8s;
use crate::utils::k8s::{self, Client};

pub type Result<T, E = Error> = std::result::Result<T, E>;

Expand Down Expand Up @@ -34,7 +34,7 @@ impl Display for Credentials {
/// and/or `password_key` are not found or the product does not provide
/// any credentials.
pub async fn get(
kube_client: &k8s::Client,
client: &Client,
product_name: &str,
stacklet: &DynamicObject,
) -> Result<Option<Credentials>> {
Expand All @@ -56,7 +56,7 @@ pub async fn get(
.as_str()
.context(NoSecretSnafu)?;

kube_client
client
.get_credentials_from_secret(
secret_name,
&stacklet.namespace().unwrap(),
Expand All @@ -71,7 +71,7 @@ pub async fn get(
.as_str()
.context(NoSecretSnafu)?;

kube_client
client
.get_credentials_from_secret(
secret_name,
&stacklet.namespace().unwrap(),
Expand Down
36 changes: 27 additions & 9 deletions rust/stackable-cockpit/src/platform/demo/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ use crate::{
release::ReleaseList,
stack::{self, StackInstallParameters, StackList},
},
utils::params::{
IntoParameters, IntoParametersError, Parameter, RawParameter, RawParameterParseError,
utils::{
k8s::Client,
params::{
IntoParameters, IntoParametersError, Parameter, RawParameter, RawParameterParseError,
},
},
xfer::{self, Client},
xfer,
};

pub type RawDemoParameterParseError = RawParameterParseError;
Expand Down Expand Up @@ -94,7 +97,11 @@ impl DemoSpec {
/// - Does the demo support to be installed in the requested namespace?
/// - Does the cluster have enough resources available to run this demo?
#[instrument(skip_all)]
pub async fn check_prerequisites(&self, product_namespace: &str) -> Result<(), Error> {
pub async fn check_prerequisites(
&self,
client: &Client,
product_namespace: &str,
) -> Result<(), Error> {
debug!("Checking prerequisites before installing demo");

// Returns an error if the demo doesn't support to be installed in the
Expand All @@ -109,7 +116,10 @@ impl DemoSpec {
// Checks if the available cluster resources are sufficient to deploy
// the demo.
if let Some(resource_requests) = &self.resource_requests {
if let Err(err) = resource_requests.validate_cluster_size("demo").await {
if let Err(err) = resource_requests
.validate_cluster_size(client, "demo")
.await
{
match err {
ResourceRequestsError::ValidationErrors { errors } => {
for error in errors {
Expand All @@ -129,15 +139,16 @@ impl DemoSpec {
stack_list: StackList,
release_list: ReleaseList,
install_parameters: DemoInstallParameters,
transfer_client: &Client,
client: &Client,
transfer_client: &xfer::Client,
) -> Result<(), Error> {
// Get the stack spec based on the name defined in the demo spec
let stack = stack_list.get(&self.stack).context(NoSuchStackSnafu {
name: self.stack.clone(),
})?;

// Check demo prerequisites
self.check_prerequisites(&install_parameters.product_namespace)
self.check_prerequisites(client, &install_parameters.product_namespace)
.await?;

let stack_install_parameters = StackInstallParameters {
Expand All @@ -151,19 +162,25 @@ impl DemoSpec {
};

stack
.install(release_list, stack_install_parameters, transfer_client)
.install(
release_list,
stack_install_parameters,
client,
transfer_client,
)
.await
.context(InstallStackSnafu)?;

// Install demo manifests
self.prepare_manifests(install_parameters, transfer_client)
self.prepare_manifests(install_parameters, client, transfer_client)
.await
}

#[instrument(skip_all)]
async fn prepare_manifests(
&self,
install_params: DemoInstallParameters,
client: &Client,
transfer_client: &xfer::Client,
) -> Result<(), Error> {
info!("Installing demo manifests");
Expand All @@ -179,6 +196,7 @@ impl DemoSpec {
&params,
&install_params.product_namespace,
install_params.labels,
client,
transfer_client,
)
.await
Expand Down
9 changes: 4 additions & 5 deletions rust/stackable-cockpit/src/platform/manifests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
common::manifest::ManifestSpec,
helm,
utils::{
k8s,
k8s::{self, Client},
path::{IntoPathOrUrl, PathOrUrlParseError},
},
xfer::{
Expand Down Expand Up @@ -61,20 +61,19 @@ pub enum Error {
}

pub trait InstallManifestsExt {
// TODO (Techassi): This step shouldn't care about templating the manifests nor fecthing them from remote
// TODO (Techassi): This step shouldn't care about templating the manifests nor fetching them from remote
#[instrument(skip_all)]
#[allow(async_fn_in_trait)]
async fn install_manifests(
manifests: &[ManifestSpec],
parameters: &HashMap<String, String>,
product_namespace: &str,
labels: Labels,
client: &Client,
transfer_client: &xfer::Client,
) -> Result<(), Error> {
debug!("Installing demo / stack manifests");

let kube_client = k8s::Client::new().await.context(CreateKubeClientSnafu)?;

for manifest in manifests {
match manifest {
ManifestSpec::HelmChart(helm_file) => {
Expand Down Expand Up @@ -137,7 +136,7 @@ pub trait InstallManifestsExt {
.await
.context(FileTransferSnafu)?;

kube_client
client
.deploy_manifests(&manifests, product_namespace, labels.clone())
.await
.context(DeployManifestSnafu)?
Expand Down
8 changes: 3 additions & 5 deletions rust/stackable-cockpit/src/platform/namespace.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use snafu::{ResultExt, Snafu};
use snafu::Snafu;

use crate::utils::k8s;
use crate::utils::k8s::{self, Client};

#[derive(Debug, Snafu)]
pub enum Error {
Expand All @@ -13,9 +13,7 @@ pub enum Error {

/// Creates a namespace with `name` if needed (not already present in the
/// cluster).
pub async fn create_if_needed(name: String) -> Result<(), Error> {
let client = k8s::Client::new().await.context(KubeClientCreateSnafu)?;

pub async fn create_if_needed(client: &Client, name: String) -> Result<(), Error> {
client
.create_namespace_if_needed(name)
.await
Expand Down
33 changes: 14 additions & 19 deletions rust/stackable-cockpit/src/platform/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use kube::{api::ListParams, ResourceExt};
use snafu::{OptionExt, ResultExt, Snafu};
use tracing::{debug, warn};

use crate::utils::k8s::{self, ListParamsExt};
use crate::utils::k8s::{self, Client, ListParamsExt};

#[derive(Debug, Snafu)]
pub enum Error {
Expand All @@ -40,15 +40,15 @@ pub enum Error {
}

pub async fn get_endpoints(
kube_client: &k8s::Client,
client: &Client,
product_name: &str,
object_name: &str,
object_namespace: &str,
) -> Result<IndexMap<String, String>, Error> {
let list_params =
ListParams::from_product(product_name, Some(object_name), k8s::ProductLabel::Name);

let listeners = kube_client
let listeners = client
.list_listeners(Some(object_namespace), &list_params)
.await;
let listeners = match listeners {
Expand Down Expand Up @@ -92,13 +92,13 @@ pub async fn get_endpoints(
return Ok(endpoints);
}

let services = kube_client
let services = client
.list_services(Some(object_namespace), &list_params)
.await
.context(KubeClientFetchSnafu)?;

for service in services {
match get_endpoint_urls(kube_client, &service, object_name).await {
match get_endpoint_urls(client, &service, object_name).await {
Ok(urls) => endpoints.extend(urls),
Err(err) => warn!(
"Failed to get endpoint_urls of service {service_name}: {err}",
Expand All @@ -111,7 +111,7 @@ pub async fn get_endpoints(
}

pub async fn get_endpoint_urls(
kube_client: &k8s::Client,
client: &Client,
service: &Service,
referenced_object_name: &str,
) -> Result<IndexMap<String, String>, Error> {
Expand All @@ -128,7 +128,7 @@ pub async fn get_endpoint_urls(
let endpoints = match service_spec.type_.as_deref() {
Some("NodePort") => {
get_endpoint_urls_for_nodeport(
kube_client,
client,
&service_name,
service_spec,
&service_namespace,
Expand All @@ -152,13 +152,13 @@ pub async fn get_endpoint_urls(
}

pub async fn get_endpoint_urls_for_nodeport(
kube_client: &k8s::Client,
client: &Client,
service_name: &str,
service_spec: &ServiceSpec,
service_namespace: &str,
referenced_object_name: &str,
) -> Result<IndexMap<String, String>, Error> {
let endpoints = kube_client
let endpoints = client
.get_endpoints(service_namespace, service_name)
.await
.context(KubeClientFetchSnafu)?;
Expand Down Expand Up @@ -191,7 +191,7 @@ pub async fn get_endpoint_urls_for_nodeport(
}
};

let node_ip = get_node_ip(kube_client, node_name).await?;
let node_ip = get_node_ip(client, node_name).await?;

let mut endpoints = IndexMap::new();
for service_port in service_spec.ports.iter().flatten() {
Expand Down Expand Up @@ -265,8 +265,8 @@ pub async fn get_endpoint_urls_for_loadbalancer(
Ok(endpoints)
}

async fn get_node_ip(kube_client: &k8s::Client, node_name: &str) -> Result<String, Error> {
let node_name_ip_mapping = get_node_name_ip_mapping(kube_client).await?;
async fn get_node_ip(client: &Client, node_name: &str) -> Result<String, Error> {
let node_name_ip_mapping = get_node_name_ip_mapping(client).await?;

match node_name_ip_mapping.get(node_name) {
Some(node_ip) => Ok(node_ip.to_string()),
Expand All @@ -276,13 +276,8 @@ async fn get_node_ip(kube_client: &k8s::Client, node_name: &str) -> Result<Strin

// TODO(sbernauer): Add caching. Not going to do so now, as listener-op
// will replace this code entirely anyway.
async fn get_node_name_ip_mapping(
kube_client: &k8s::Client,
) -> Result<HashMap<String, String>, Error> {
let nodes = kube_client
.list_nodes()
.await
.context(KubeClientFetchSnafu)?;
async fn get_node_name_ip_mapping(client: &Client) -> Result<HashMap<String, String>, Error> {
let nodes = client.list_nodes().await.context(KubeClientFetchSnafu)?;

let mut result = HashMap::new();
for node in nodes {
Expand Down
Loading
Loading