Skip to content

Commit

Permalink
requested changes
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuajerin committed Aug 26, 2024
1 parent b4e364c commit 8cee166
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 79 deletions.
2 changes: 1 addition & 1 deletion tembo-operator/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ impl CoreDB {
Action::requeue(Duration::from_secs(300))
})?;

reconcile_dedicated_networking(self, ctx.clone())
reconcile_dedicated_networking(self, ctx.clone(), basedomain.as_str())
.await
.map_err(|e| {
error!("Error reconciling dedicated networking: {:?}", e);
Expand Down
145 changes: 76 additions & 69 deletions tembo-operator/src/dedicated_networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use tracing::{debug, error, info};
pub async fn reconcile_dedicated_networking(
cdb: &CoreDB,
ctx: Arc<Context>,
basedomain: &str,
) -> Result<(), OperatorError> {
let ns = cdb.namespace().unwrap_or_else(|| {
error!(
Expand All @@ -37,24 +38,23 @@ pub async fn reconcile_dedicated_networking(
);
"default".to_string()
});
let basedomain = env::var("DATA_PLANE_BASEDOMAIN").unwrap_or_else(|_| "localhost".to_string());
let port = IntOrString::Int(5432);
let client = ctx.client.clone();

info!(
debug!(
"Starting reconciliation of dedicated networking for CoreDB instance: {} in namespace: {}",
cdb.name_any(),
ns
);

if let Some(dedicated_networking) = &cdb.spec.dedicatedNetworking {
if dedicated_networking.enabled {
info!(
debug!(
"Dedicated networking is enabled for CoreDB instance: {}",
cdb.name_any()
);

info!(
debug!(
"Reconciling network policies for dedicated networking in namespace: {}",
ns
);
Expand All @@ -65,7 +65,7 @@ pub async fn reconcile_dedicated_networking(
e
})?;

info!(
debug!(
"Reconciling IP allow list middleware for CoreDB instance: {}",
cdb.name_any()
);
Expand All @@ -76,7 +76,7 @@ pub async fn reconcile_dedicated_networking(
e
})?;

info!(
debug!(
"Handling primary service ingress for CoreDB instance: {}",
cdb.name_any()
);
Expand All @@ -98,7 +98,7 @@ pub async fn reconcile_dedicated_networking(
cdb,
ctx.clone(),
&ns,
&basedomain,
basedomain,
"dedicated",
&format!("{}-dedicated", cdb.name_any()),
port.clone(),
Expand Down Expand Up @@ -135,7 +135,7 @@ pub async fn reconcile_dedicated_networking(
cdb,
ctx.clone(),
&ns,
&basedomain,
basedomain,
"dedicated-ro",
&format!("{}-dedicated-ro", cdb.name_any()),
port.clone(),
Expand Down Expand Up @@ -173,7 +173,7 @@ pub async fn reconcile_dedicated_networking(
cdb,
ctx.clone(),
&ns,
&basedomain,
basedomain,
"dedicated",
&format!("{}-dedicated", cdb.name_any()),
port.clone(),
Expand All @@ -191,7 +191,7 @@ pub async fn reconcile_dedicated_networking(
cdb,
ctx,
&ns,
&basedomain,
basedomain,
"dedicated-ro",
&format!("{}-dedicated-ro", cdb.name_any()),
port,
Expand Down Expand Up @@ -237,17 +237,40 @@ async fn reconcile_dedicated_networking_network_policies(
) -> Result<(), OperatorError> {
let cdb_name = cdb.name_any();
let np_api: Api<NetworkPolicy> = Api::namespaced(client, namespace);
let cidr = match cdb.spec.ip_allow_list.clone() {
None => "0.0.0.0/0".to_string(),
Some(ips) => ips.get(0).cloned().unwrap_or("0.0.0.0/0".to_string()),
};

let cidr_list = env::var("CLOUD_LOAD_BALANCER_INTERNAL_IP_CIDR")
.unwrap_or_else(|_| "10.0.0.0/8".to_string())
.split(',')
.map(|s| s.trim().to_string())
.collect::<Vec<String>>();

let policy_name = format!("{}-allow-nlb", cdb_name);
info!(
"Applying network policy: {} in namespace: {} to allow traffic from CIDR: {}",
policy_name, namespace, cidr
"Applying network policy: {} in namespace: {} to allow traffic from CIDRs: {:?}",
policy_name, namespace, cidr_list
);

let ingress_rules = cidr_list
.into_iter()
.map(|cidr| {
serde_json::json!({
"from": [
{
"ipBlock": {
"cidr": cidr
}
}
],
"ports": [
{
"protocol": "TCP",
"port": 5432
}
]
})
})
.collect::<Vec<_>>();

let dedicated_network_policy = serde_json::json!({
"apiVersion": "networking.k8s.io/v1",
"kind": "NetworkPolicy",
Expand All @@ -262,23 +285,7 @@ async fn reconcile_dedicated_networking_network_policies(
}
},
"policyTypes": ["Ingress"],
"ingress": [
{
"from": [
{
"ipBlock": {
"cidr": cidr
}
}
],
"ports": [
{
"protocol": "TCP",
"port": 5432
}
]
}
]
"ingress": ingress_rules
}
});

Expand Down Expand Up @@ -389,45 +396,43 @@ async fn reconcile_dedicated_networking_service(
};
let lb_internal = if is_public { "false" } else { "true" };

info!(
debug!(
"Applying Service: {} in namespace: {} with type: {} and scheme: {}",
service_name, namespace, service_type, lb_scheme
);

let mut annotations = serde_json::Map::new();
annotations.insert(
"external-dns.alpha.kubernetes.io/hostname".to_string(),
serde_json::Value::String(format!("{}.{}", service_name, "example.com")),
serde_json::Value::String(format!("{}", namespace)),
);

if service_type == "LoadBalancer" {
annotations.extend([
(
"service.beta.kubernetes.io/aws-load-balancer-internal".to_string(),
serde_json::Value::String(lb_scheme.to_string()),
),
(
"service.beta.kubernetes.io/aws-load-balancer-scheme".to_string(),
serde_json::Value::String(lb_internal.to_string()),
),
(
"service.beta.kubernetes.io/aws-load-balancer-nlb-target-type".to_string(),
serde_json::Value::String("ip".to_string()),
),
(
"service.beta.kubernetes.io/aws-load-balancer-type".to_string(),
serde_json::Value::String("nlb-ip".to_string()),
),
(
"service.beta.kubernetes.io/aws-load-balancer-healthcheck-protocol".to_string(),
serde_json::Value::String("TCP".to_string()),
),
(
"service.beta.kubernetes.io/aws-load-balancer-healthcheck-port".to_string(),
serde_json::Value::String("5432".to_string()),
),
]);
}
annotations.extend([
(
"service.beta.kubernetes.io/aws-load-balancer-internal".to_string(),
serde_json::Value::String(lb_scheme.to_string()),
),
(
"service.beta.kubernetes.io/aws-load-balancer-scheme".to_string(),
serde_json::Value::String(lb_internal.to_string()),
),
(
"service.beta.kubernetes.io/aws-load-balancer-nlb-target-type".to_string(),
serde_json::Value::String("ip".to_string()),
),
(
"service.beta.kubernetes.io/aws-load-balancer-type".to_string(),
serde_json::Value::String("nlb-ip".to_string()),
),
(
"service.beta.kubernetes.io/aws-load-balancer-healthcheck-protocol".to_string(),
serde_json::Value::String("TCP".to_string()),
),
(
"service.beta.kubernetes.io/aws-load-balancer-healthcheck-port".to_string(),
serde_json::Value::String("5432".to_string()),
),
]);

let mut labels = serde_json::Map::new();
labels.insert(
Expand Down Expand Up @@ -462,10 +467,12 @@ async fn reconcile_dedicated_networking_service(
service_spec.insert("type".to_string(), json!(service_type));

if service_type == "LoadBalancer" {
let load_balancer_source_ranges = match cdb.spec.ip_allow_list.clone() {
None => vec!["0.0.0.0/0".to_string()],
Some(ips) => ips,
};
let load_balancer_source_ranges = env::var("CLOUD_LOAD_BALANCER_INTERNAL_IP_CIDR")
.unwrap_or_else(|_| "10.0.0.0/8".to_string())
.split(',')
.map(|s| s.trim().to_string())
.collect::<Vec<String>>();

service_spec.insert(
"loadBalancerSourceRanges".to_string(),
json!(load_balancer_source_ranges),
Expand All @@ -486,7 +493,7 @@ async fn reconcile_dedicated_networking_service(
});

let svc_api: Api<Service> = Api::namespaced(client, namespace);
let patch_params = PatchParams::apply("conductor").force();
let patch_params = PatchParams::apply("cntrlr").force();
let patch = Patch::Apply(&service);

svc_api
Expand Down Expand Up @@ -531,7 +538,7 @@ async fn delete_dedicated_networking_service(

let svc_api: Api<Service> = Api::namespaced(client, namespace);

info!(
debug!(
"Checking if service: {} exists in namespace: {} for deletion",
service_name, namespace
);
Expand Down
6 changes: 0 additions & 6 deletions tembo-operator/src/ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,6 @@ pub async fn reconcile_postgres_ing_route_tcp(
middleware_names: Vec<String>,
delete: bool,
) -> Result<(), OperatorError> {
// Check if the basedomain is set
if basedomain.is_empty() {
debug!("Base domain is not set. Skipping IngressRouteTCP reconciliation.");
return Ok(());
}

let client = ctx.client.clone();
// Initialize kube api for ingress route tcp
let ingress_route_tcp_api: Api<IngressRouteTCP> = Api::namespaced(client, namespace);
Expand Down
4 changes: 1 addition & 3 deletions tembo-operator/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1906,7 +1906,7 @@ mod test {

let mut rng = rand::thread_rng();
let suffix = rng.gen_range(0..100000);
let name = &format!("test-networking-{}", suffix.clone());
let name = &format!("example-dedicated-networking-{}", suffix.clone());
let namespace = match create_namespace(client.clone(), name).await {
Ok(namespace) => namespace,
Err(e) => {
Expand Down Expand Up @@ -2105,8 +2105,6 @@ mod test {
let patch = Patch::Apply(&coredb_json);
let _coredb_resource = coredbs.patch(name, &params, &patch).await.unwrap();

tokio::time::sleep(Duration::from_secs(10)).await;

let service_dedicated = service_exists(
context.clone(),
&namespace,
Expand Down

0 comments on commit 8cee166

Please sign in to comment.