Skip to content

Commit

Permalink
Merge pull request #79 from influxdata/fix/handle-cluster-scoped-reso…
Browse files Browse the repository at this point in the history
…urces

fix: handle cluster scoped resources
  • Loading branch information
lukebond authored Sep 29, 2023
2 parents 609ab5e + 1097b2e commit a30a553
Showing 1 changed file with 166 additions and 28 deletions.
194 changes: 166 additions & 28 deletions src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use kube::{
},
Api, Client, Config, Resource, ResourceExt,
};
use serde_json::json;
use serde_json::{json, Value};

use crate::{
mapping::set_field_path,
Expand Down Expand Up @@ -76,7 +76,7 @@ async fn cluster_client(
struct NamespacedApi {
api: Api<DynamicObject>,
ar: ApiResource,
namespace: String,
namespace: Option<String>,
}

async fn api_for(
Expand All @@ -97,17 +97,14 @@ async fn api_for(
Some(cluster) => match &cluster.namespace {
Some(namespace) => (
Api::namespaced_with(client.clone(), namespace, &ar),
namespace.to_owned(),
Some(namespace.to_owned()),
),
None => (
Api::default_namespaced_with(client.clone(), &ar),
client.default_namespace().to_owned(),
Some(client.default_namespace().to_owned()),
),
},
None => (
Api::namespaced_with(client.clone(), local_ns, &ar),
local_ns.to_owned(),
),
None => (Api::all_with(client.clone(), &ar), None),
};

Ok(NamespacedApi { api, ar, namespace })
Expand All @@ -132,12 +129,18 @@ async fn reconcile(sinker: Arc<ResourceSync>, ctx: Arc<Context>) -> Result<Actio
namespace: target_namespace,
} = api_for(&sinker.spec.target, &local_ns, Arc::clone(&ctx)).await?;

debug!(%target_namespace, "got client for target");
debug!(?target_namespace, "got client for target");

let target = if sinker.spec.mappings.is_empty() {
clone_resource(&source, target_ref, &target_namespace, &ar)?
clone_resource(&source, target_ref, target_namespace.as_deref(), &ar)?
} else {
apply_mappings(&source, target_ref, &target_namespace, &ar, &sinker)?
apply_mappings(
&source,
target_ref,
target_namespace.as_deref(),
&ar,
&sinker,
)?
};
debug!(?target, "produced target object");

Expand All @@ -155,12 +158,11 @@ async fn reconcile(sinker: Arc<ResourceSync>, ctx: Arc<Context>) -> Result<Actio
fn clone_resource(
source: &DynamicObject,
target_ref: &GVKN,
target_namespace: &str,
target_namespace: Option<&str>,
ar: &ApiResource,
) -> Result<DynamicObject> {
let mut target = DynamicObject::new(&target_ref.name, ar)
.within(target_namespace)
.data(source.data.clone());
let mut target = DynamicObject::new(&target_ref.name, ar).data(source.data.clone());
target.metadata.namespace = target_namespace.map(String::from);

target.metadata.annotations = source.metadata.annotations.clone().map(cleanup_annotations);
target.metadata.labels = source.metadata.labels.clone();
Expand All @@ -172,13 +174,12 @@ fn clone_resource(
fn apply_mappings(
source: &DynamicObject,
target_ref: &GVKN,
target_namespace: &str,
target_namespace: Option<&str>,
ar: &ApiResource,
sinker: &ResourceSync,
) -> Result<DynamicObject> {
let mut template = DynamicObject::new(&target_ref.name, ar)
.within(target_namespace)
.data(json!({}));
let mut template = DynamicObject::new(&target_ref.name, ar).data(json!({}));
template.metadata.namespace = target_namespace.map(String::from);

for mapping in &sinker.spec.mappings {
let subtree = find_field_path(source, &mapping.from_field_path)?;
Expand Down Expand Up @@ -210,12 +211,16 @@ fn apply_mappings(
let source_metadata = convert_metadata(&subtree["metadata"]);
let mut subtree = subtree.clone();
cleanup_subtree(&mut subtree);
let mut source = DynamicObject::new(&subtree["metadata"]["name"].to_string(), &ar)
.within(&subtree["metadata"]["namespace"].to_string())
.data(subtree);
let mut source = if let Value::Null = subtree["metadata"]["namespace"] {
DynamicObject::new(&subtree["metadata"]["name"].to_string(), &ar).data(subtree)
} else {
DynamicObject::new(&subtree["metadata"]["name"].to_string(), &ar)
.within(&subtree["metadata"]["namespace"].to_string())
.data(subtree)
};
source.metadata.annotations = source_metadata.annotations;
source.metadata.labels = source_metadata.labels;
template = clone_resource(&source, target_ref, &target_namespace, &ar)?;
template = clone_resource(&source, target_ref, target_namespace, &ar)?;
}
Mapping {
from_field_path: _,
Expand Down Expand Up @@ -450,7 +455,68 @@ mod tests {
let target = clone_resource(
&dynamic_sc,
&resource_sync.spec.target.resource_ref,
"default",
Some("default"),
&ar,
)
.unwrap();
assert_eq!(
serde_json::to_string(&target).unwrap(),
serde_json::to_string(&expected).unwrap(),
);
}

#[tokio::test]
async fn test_clone_resource_cluster_scoped() {
let resource_sync = ResourceSync::new(
"sinker-test",
ResourceSyncSpec {
mappings: vec![],
source: ClusterResourceRef {
resource_ref: GVKN {
api_version: "rbac.authorization.k8s.io/v1".to_string(),
kind: "ClusterRole".to_string(),
name: "test-clusterrole-1".to_string(),
},
cluster: None,
},
target: ClusterResourceRef {
resource_ref: GVKN {
api_version: "rbac.authorization.k8s.io/v1".to_string(),
kind: "ClusterRole".to_string(),
name: "test-clusterrole-2".to_string(),
},
cluster: None,
},
},
);
let dynamic_sc: DynamicObject = serde_json::from_str(
&serde_json::to_string(&serde_json::json!({
"apiVersion": "rbac.authorization.k8s.io/v1",
"kind": "ClusterRole",
"metadata": { "name": "test-clusterrole-1" },
"rules": [],
}))
.unwrap(),
)
.unwrap();
let expected = serde_json::json!({
"apiVersion": "rbac.authorization.k8s.io/v1",
"kind": "ClusterRole",
"metadata": {
"name": "test-clusterrole-2",
"namespace": "default",
},
"rules": [],
});
let ar = ApiResource::from_gvk(&GroupVersionKind {
group: "rbac.authorization.k8s.io".to_string(),
version: "v1".to_string(),
kind: "ClusterRole".to_string(),
});
let target = clone_resource(
&dynamic_sc,
&resource_sync.spec.target.resource_ref,
Some("default"),
&ar,
)
.unwrap();
Expand Down Expand Up @@ -522,7 +588,7 @@ mod tests {
let target = apply_mappings(
&dynamic_sc,
&resource_sync.spec.target.resource_ref,
"default",
Some("default"),
&ar,
&resource_sync,
)
Expand Down Expand Up @@ -605,7 +671,7 @@ mod tests {
let target = apply_mappings(
&dynamic_sc,
&resource_sync.spec.target.resource_ref,
"default",
Some("default"),
&ar,
&resource_sync,
)
Expand Down Expand Up @@ -689,7 +755,79 @@ mod tests {
let target = apply_mappings(
&dynamic_sc,
&resource_sync.spec.target.resource_ref,
"default",
Some("default"),
&ar,
&resource_sync,
)
.unwrap();
assert_eq!(
serde_json::to_string(&target).unwrap(),
serde_json::to_string(&expected).unwrap(),
);
}

#[tokio::test]
async fn test_apply_mappings_from_sinkercontainer_clusterscoped() {
let resource_sync = ResourceSync::new(
"sinker-test",
ResourceSyncSpec {
mappings: vec![Mapping {
from_field_path: Some("spec".to_string()),
to_field_path: None,
}],
source: ClusterResourceRef {
resource_ref: GVKN {
api_version: "sinker.influxdata.io/v1alpha1".to_string(),
kind: "SinkerContainer".to_string(),
name: "test-sinker-container".to_string(),
},
cluster: None,
},
target: ClusterResourceRef {
resource_ref: GVKN {
api_version: "rbac.authorization.k8s.io/v1".to_string(),
kind: "ClusterRole".to_string(),
name: "test-clusterrole".to_string(),
},
cluster: None,
},
},
);
let ar = ApiResource::from_gvk(&GroupVersionKind {
group: "rbac.authorization.k8s.io".to_string(),
version: "v1".to_string(),
kind: "ClusterRole".to_string(),
});
let expected = serde_json::json!({
"apiVersion": "rbac.authorization.k8s.io/v1",
"kind": "ClusterRole",
"metadata": {
"name": "test-clusterrole",
"namespace": "default",
},
"rules": [],
});
let dynamic_sc: DynamicObject = serde_json::from_str(
&serde_json::to_string(&serde_json::json!({
"apiVersion": "sinker.influxdata.io/v1alpha1",
"kind": "SinkerContainer",
"metadata": { "name": "test-sinker-container" },
"spec": {
"apiVersion": "rbac.authorization.k8s.io/v1",
"kind": "ClusterRole",
"metadata": {
"name": "test-clusterrole",
},
"rules": [],
},
}))
.unwrap(),
)
.unwrap();
let target = apply_mappings(
&dynamic_sc,
&resource_sync.spec.target.resource_ref,
Some("default"),
&ar,
&resource_sync,
)
Expand Down Expand Up @@ -764,7 +902,7 @@ mod tests {
let target = apply_mappings(
&dynamic_sc,
&resource_sync.spec.target.resource_ref,
"default",
Some("default"),
&ar,
&resource_sync,
)
Expand Down

0 comments on commit a30a553

Please sign in to comment.