Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#624 Set flat priority only for services with traffic splitting #414

Merged
merged 6 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

Lists all changes with user impact.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

## [0.20.13]
### Changed
- Added setting: "zonesAllowingTrafficSplitting", so changes in a config would be made only for envoys in that zone
- Fixed setting priority for traffic splitting endpoints, they will be duplicated with higher priorities

## [0.20.12]
### Changed
- Added "trackRemaining" flag to enable possibility of tracking additional circuit breaker metrics
Expand Down
3 changes: 2 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ Property
**envoy-control.envoy.snapshot.load-balancing.policy** | load balancing policy used for clusters. [Accepted values](https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/cluster/v3/cluster.proto#enum-config-cluster-v3-cluster-lbpolicy) | LEAST_REQUEST
**envoy-control.envoy.snapshot.load-balancing.use-keys-subset-fallback-policy** | KEYS_SUBSET fallback policy is used by default when canary and service-tags are enabled. It is not supported in Envoy <= 1.12.x. Set to false for compatibility with Envoy 1.12.x | true
**envoy-control.envoy.snapshot.load-balancing.traffic-splitting.zoneName** | a zone to which traffic will be routed if traffic splitting is enabled | ""
**envoy-control.envoy.snapshot.load-balancing.traffic-splitting.weights-by-service-properties.** | a map that maps service name to a map [zoneName: weight] | empty map
**envoy-control.envoy.snapshot.load-balancing.traffic-splitting.zones-allowing-traffic-splitting** | a zone from which traffic should be splitted | empty list
**envoy-control.envoy.snapshot.load-balancing.traffic-splitting.weights-by-service** | a map that maps service name to a map [zoneName: weight] | empty map

## Routing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class ControlPlane private constructor(
val envoySnapshotFactory = EnvoySnapshotFactory(
ingressRoutesFactory = EnvoyIngressRoutesFactory(snapshotProperties, envoyHttpFilters, currentZone),
egressRoutesFactory = EnvoyEgressRoutesFactory(snapshotProperties),
clustersFactory = EnvoyClustersFactory(snapshotProperties),
clustersFactory = EnvoyClustersFactory(snapshotProperties, currentZone),
endpointsFactory = EnvoyEndpointsFactory(
snapshotProperties,
ServiceTagMetadataGenerator(snapshotProperties.routing.serviceTags),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ class EnvoySnapshotFactory(
}
}
}

val rateLimitClusters =
if (rateLimitEndpoints.isNotEmpty()) listOf(properties.rateLimit.serviceName) else emptyList()
val rateLimitLoadAssignments = rateLimitClusters.mapNotNull { name -> globalSnapshot.endpoints[name] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ class TrafficSplittingProperties {
var zoneName = ""
var headerName = ""
var weightsByService: Map<String, ZoneWeights> = mapOf()
var zonesAllowingTrafficSplitting = listOf<String>()
kozjan marked this conversation as resolved.
Show resolved Hide resolved
}

class ZoneWeights {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.GlobalSnapshot
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.OAuthProvider
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.Threshold
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.TrafficSplittingProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.SanUriMatcherFactory

typealias EnvoyClusterConfig = io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig

class EnvoyClustersFactory(
private val properties: SnapshotProperties
private val properties: SnapshotProperties,
private val currentZone: String
) {
private val httpProtocolOptions: HttpProtocolOptions = HttpProtocolOptions.newBuilder().setIdleTimeout(
Durations.fromMillis(properties.egress.commonHttp.connectionIdleTimeout.toMillis())
Expand Down Expand Up @@ -283,14 +283,18 @@ class EnvoyClustersFactory(
): Boolean {
val trafficSplitting = properties.loadBalancing.trafficSplitting
val trafficSplitEnabled = trafficSplitting.weightsByService.containsKey(serviceName)
return trafficSplitEnabled && hasEndpointsInZone(clusterLoadAssignment, trafficSplitting)
val allowed = clusterLoadAssignment != null &&
hasEndpointsInZone(clusterLoadAssignment, trafficSplitting.zoneName) &&
trafficSplitting.zonesAllowingTrafficSplitting.contains(currentZone)
return trafficSplitEnabled && allowed
}

private fun hasEndpointsInZone(
clusterLoadAssignment: ClusterLoadAssignment?,
trafficSplitting: TrafficSplittingProperties
zoneName: String
) = clusterLoadAssignment?.endpointsList
?.any { e -> trafficSplitting.zoneName == e.locality.zone && e.lbEndpointsCount > 0 } ?: false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we no longer check if there are any endpoints?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea I think I had some justification when I was deleting it, but now don't recall why, so returned it back

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as I understand you had it tested here and it worked without checking endpoint count?

?.any { e -> zoneName == e.locality.zone && e.lbEndpointsCount > 0 }
?: false

private fun shouldAddDynamicForwardProxyCluster(group: Group) =
group.proxySettings.outgoing.getDomainPatternDependencies().isNotEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class EnvoyEndpointsFactory(
) {
companion object {
private val logger by logger()
private const val HIGHEST_PRIORITY = 0
private const val DEFAULT_WEIGHT = 1
}

fun createLoadAssignment(
Expand Down Expand Up @@ -84,22 +86,48 @@ class EnvoyEndpointsFactory(
return if (routeSpec is WeightRouteSpecification) {
ClusterLoadAssignment.newBuilder(loadAssignment)
.clearEndpoints()
.addAllEndpoints(assignWeights(loadAssignment.endpointsList, routeSpec.clusterWeights))
.addAllEndpoints(
assignWeightsAndDuplicateEndpoints(
loadAssignment.endpointsList,
routeSpec.clusterWeights
)
)
.setClusterName(routeSpec.clusterName)
.build()
} else loadAssignment
}

private fun assignWeights(
llbEndpointsList: List<LocalityLbEndpoints>, weights: ZoneWeights
private fun assignWeightsAndDuplicateEndpoints(
llbEndpointsList: List<LocalityLbEndpoints>,
weights: ZoneWeights
): List<LocalityLbEndpoints> {
if (properties.loadBalancing.trafficSplitting.zonesAllowingTrafficSplitting.contains(currentZone)) {
val endpoints = llbEndpointsList
.map {
if (weights.weightByZone.containsKey(it.locality.zone)) {
LocalityLbEndpoints.newBuilder(it)
.setLoadBalancingWeight(
UInt32Value.of(
weights.weightByZone[it.locality.zone] ?: DEFAULT_WEIGHT
)
)
.build()
} else it
}
return overrideTrafficSplittingZoneEndpointsPriority(endpoints) + endpoints
}
return llbEndpointsList
}

private fun overrideTrafficSplittingZoneEndpointsPriority(
endpoints: List<LocalityLbEndpoints>
): List<LocalityLbEndpoints> {
return endpoints
.filter { properties.loadBalancing.trafficSplitting.zoneName == it.locality.zone }
.map {
if (weights.weightByZone.containsKey(it.locality.zone)) {
LocalityLbEndpoints.newBuilder(it)
.setLoadBalancingWeight(UInt32Value.of(weights.weightByZone[it.locality.zone] ?: 0))
.build()
} else it
LocalityLbEndpoints.newBuilder(it)
.setPriority(HIGHEST_PRIORITY)
.build()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ import pl.allegro.tech.servicemesh.envoycontrol.utils.CLUSTER_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_CLUSTER_WEIGHTS
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_DISCOVERY_SERVICE_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_IDLE_TIMEOUT
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_PRIORITY
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_SERVICE_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.EGRESS_HOST
import pl.allegro.tech.servicemesh.envoycontrol.utils.EGRESS_PORT
import pl.allegro.tech.servicemesh.envoycontrol.utils.HIGHEST_PRIORITY
import pl.allegro.tech.servicemesh.envoycontrol.utils.INGRESS_HOST
import pl.allegro.tech.servicemesh.envoycontrol.utils.INGRESS_PORT
import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_PROPERTIES_WITH_WEIGHTS
Expand Down Expand Up @@ -275,13 +277,18 @@ class EnvoySnapshotFactoryTest {
assertThat(it.endpointsList)
.anySatisfy { e ->
e.locality.zone == TRAFFIC_SPLITTING_ZONE &&
e.loadBalancingWeight.value == DEFAULT_CLUSTER_WEIGHTS.weightByZone[TRAFFIC_SPLITTING_ZONE]
e.loadBalancingWeight.value == DEFAULT_CLUSTER_WEIGHTS.weightByZone[TRAFFIC_SPLITTING_ZONE] &&
e.priority == DEFAULT_PRIORITY
}.anySatisfy { e ->
e.locality.zone == TRAFFIC_SPLITTING_ZONE &&
e.loadBalancingWeight.value == DEFAULT_CLUSTER_WEIGHTS.weightByZone[TRAFFIC_SPLITTING_ZONE] &&
e.priority == HIGHEST_PRIORITY
}
.anySatisfy { e ->
e.locality.zone == CURRENT_ZONE &&
e.loadBalancingWeight.value == DEFAULT_CLUSTER_WEIGHTS.weightByZone[CURRENT_ZONE]
}
.hasSize(2)
.hasSize(3)
}
}

Expand Down Expand Up @@ -313,7 +320,7 @@ class EnvoySnapshotFactoryTest {
e.locality.zone == TRAFFIC_SPLITTING_ZONE &&
!e.hasLoadBalancingWeight()
}
.hasSize(2)
.hasSize(3)
}
}

Expand Down Expand Up @@ -457,7 +464,7 @@ class EnvoySnapshotFactoryTest {
CURRENT_ZONE
)
val egressRoutesFactory = EnvoyEgressRoutesFactory(properties)
val clustersFactory = EnvoyClustersFactory(properties)
val clustersFactory = EnvoyClustersFactory(properties, CURRENT_ZONE)
val endpointsFactory = EnvoyEndpointsFactory(properties, ServiceTagMetadataGenerator(), CURRENT_ZONE)
val envoyHttpFilters = EnvoyHttpFilters.defaultFilters(properties)
val listenersFactory = EnvoyListenersFactory(properties, envoyHttpFilters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1312,7 +1312,7 @@ class SnapshotUpdaterTest {
EnvoySnapshotFactory(
ingressRoutesFactory = EnvoyIngressRoutesFactory(snapshotProperties, currentZone = CURRENT_ZONE),
egressRoutesFactory = EnvoyEgressRoutesFactory(snapshotProperties),
clustersFactory = EnvoyClustersFactory(snapshotProperties),
clustersFactory = EnvoyClustersFactory(snapshotProperties, CURRENT_ZONE),
endpointsFactory = EnvoyEndpointsFactory(
snapshotProperties, ServiceTagMetadataGenerator(snapshotProperties.routing.serviceTags), CURRENT_ZONE
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,30 @@ package pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.clusters
import io.envoyproxy.controlplane.cache.SnapshotResources
import io.envoyproxy.envoy.config.cluster.v3.Cluster
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment
import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import pl.allegro.tech.servicemesh.envoycontrol.groups.DependencySettings
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.GlobalSnapshot
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.utils.CLUSTER_NAME1
import pl.allegro.tech.servicemesh.envoycontrol.utils.CLUSTER_NAME2
import pl.allegro.tech.servicemesh.envoycontrol.utils.CURRENT_ZONE
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_CLUSTER_WEIGHTS
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_SERVICE_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.TRAFFIC_SPLITTING_ZONE
import pl.allegro.tech.servicemesh.envoycontrol.utils.createAllServicesGroup
import pl.allegro.tech.servicemesh.envoycontrol.utils.createCluster
import pl.allegro.tech.servicemesh.envoycontrol.utils.createClusterConfigurations
import pl.allegro.tech.servicemesh.envoycontrol.utils.createEndpoints
import pl.allegro.tech.servicemesh.envoycontrol.utils.createListenersConfig
import pl.allegro.tech.servicemesh.envoycontrol.utils.createLoadAssignments
import pl.allegro.tech.servicemesh.envoycontrol.utils.createServicesGroup

internal class EnvoyClustersFactoryTest {

companion object {
private val factory = EnvoyClustersFactory(SnapshotProperties())
private val factory = EnvoyClustersFactory(SnapshotProperties(), CURRENT_ZONE)
private val snapshotPropertiesWithWeights = SnapshotProperties().apply {
loadBalancing.trafficSplitting.weightsByService = mapOf(
DEFAULT_SERVICE_NAME to DEFAULT_CLUSTER_WEIGHTS
Expand Down Expand Up @@ -96,7 +99,7 @@ internal class EnvoyClustersFactoryTest {
@Test
fun `should get cluster with locality weighted config for group clusters`() {
val cluster1 = createCluster(snapshotPropertiesWithWeights, CLUSTER_NAME1)
val factory = EnvoyClustersFactory(snapshotPropertiesWithWeights)
val factory = EnvoyClustersFactory(snapshotPropertiesWithWeights, CURRENT_ZONE)
val result = factory.getClustersForGroup(
createServicesGroup(
snapshotProperties = snapshotPropertiesWithWeights,
Expand All @@ -113,15 +116,39 @@ internal class EnvoyClustersFactoryTest {
}
}

@Test
fun `should not apply locality weighted config if there are no endpoints in the ts zone`() {
val cluster1 = createCluster(snapshotPropertiesWithWeights, CLUSTER_NAME1)
val factory = EnvoyClustersFactory(snapshotPropertiesWithWeights, CURRENT_ZONE)
val result = factory.getClustersForGroup(
createServicesGroup(
snapshotProperties = snapshotPropertiesWithWeights,
listenersConfig = createListenersConfig(snapshotPropertiesWithWeights, true),
dependencies = arrayOf(CLUSTER_NAME1 to null),
),
createGlobalSnapshot(cluster1, endpoints = null)
)
assertThat(result)
.anySatisfy {
assertThat(it.name).isEqualTo(CLUSTER_NAME1)
assertThat(it.edsClusterConfig).isEqualTo(cluster1.edsClusterConfig)
assertThat(it.commonLbConfig.hasLocalityWeightedLbConfig()).isFalse()
}
}

private fun createGlobalSnapshot(
vararg clusters: Cluster,
securedClusters: List<Cluster> = clusters.asList()
securedClusters: List<Cluster> = clusters.asList(),
endpoints: List<LocalityLbEndpoints>? = createEndpoints()
): GlobalSnapshot {
val clusterLoadAssignment = endpoints
?.let { createLoadAssignments(clusters.toList(), endpoints) }
?: createLoadAssignments(clusters.toList(), listOf())
return GlobalSnapshot(
SnapshotResources.create<Cluster>(clusters.toList(), "pl/allegro/tech/servicemesh/envoycontrol/v3")
.resources(),
clusters.map { it.name }.toSet(),
SnapshotResources.create<ClusterLoadAssignment>(createLoadAssignments(clusters.toList()), "v1").resources(),
SnapshotResources.create<ClusterLoadAssignment>(clusterLoadAssignment, "v1").resources(),
createClusterConfigurations(),
SnapshotResources.create<Cluster>(securedClusters, "v3").resources()
)
Expand Down
Loading
Loading