Skip to content

Commit

Permalink
#624 Set flat priority only for services with traffic splitting
Browse files Browse the repository at this point in the history
  • Loading branch information
nastassia-dailidava committed Apr 22, 2024
1 parent 8e902af commit ced0295
Show file tree
Hide file tree
Showing 15 changed files with 305 additions and 89 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

Lists all changes with user impact.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

## [0.20.13]
### Changed
- Added setting: "zonesAllowingTrafficSplitting", so changes in a config would be made only for envoys in that zone
- Fixed setting priority for traffic splitting endpoints, they will be duplicated with higher priorities

## [0.20.12]
### Changed
- Added "trackRemaining" flag to enable possibility of tracking additional circuit breaker metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class ControlPlane private constructor(
val envoySnapshotFactory = EnvoySnapshotFactory(
ingressRoutesFactory = EnvoyIngressRoutesFactory(snapshotProperties, envoyHttpFilters, currentZone),
egressRoutesFactory = EnvoyEgressRoutesFactory(snapshotProperties),
clustersFactory = EnvoyClustersFactory(snapshotProperties),
clustersFactory = EnvoyClustersFactory(snapshotProperties, currentZone),
endpointsFactory = EnvoyEndpointsFactory(
snapshotProperties,
ServiceTagMetadataGenerator(snapshotProperties.routing.serviceTags),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ class EnvoySnapshotFactory(
}
}
}

val rateLimitClusters =
if (rateLimitEndpoints.isNotEmpty()) listOf(properties.rateLimit.serviceName) else emptyList()
val rateLimitLoadAssignments = rateLimitClusters.mapNotNull { name -> globalSnapshot.endpoints[name] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ class TrafficSplittingProperties {
var zoneName = ""
var headerName = ""
var weightsByService: Map<String, ZoneWeights> = mapOf()
var zonesAllowingTrafficSplitting = listOf<String>()
}

class ZoneWeights {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.GlobalSnapshot
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.OAuthProvider
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.Threshold
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.TrafficSplittingProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.SanUriMatcherFactory

typealias EnvoyClusterConfig = io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig

class EnvoyClustersFactory(
private val properties: SnapshotProperties
private val properties: SnapshotProperties,
private val currentZone: String
) {
private val httpProtocolOptions: HttpProtocolOptions = HttpProtocolOptions.newBuilder().setIdleTimeout(
Durations.fromMillis(properties.egress.commonHttp.connectionIdleTimeout.toMillis())
Expand Down Expand Up @@ -283,15 +283,14 @@ class EnvoyClustersFactory(
): Boolean {
val trafficSplitting = properties.loadBalancing.trafficSplitting
val trafficSplitEnabled = trafficSplitting.weightsByService.containsKey(serviceName)
return trafficSplitEnabled && hasEndpointsInZone(clusterLoadAssignment, trafficSplitting)
val allowed = clusterLoadAssignment != null &&
properties.loadBalancing.trafficSplitting.zonesAllowingTrafficSplitting.contains(currentZone)
if (serviceName == "varnish" || serviceName == "service-mesh-service-second") {
logger.info("trafficSplitEnabled $trafficSplitEnabled allowed $allowed, $currentZone")
}
return trafficSplitEnabled && allowed
}

private fun hasEndpointsInZone(
clusterLoadAssignment: ClusterLoadAssignment?,
trafficSplitting: TrafficSplittingProperties
) = clusterLoadAssignment?.endpointsList
?.any { e -> trafficSplitting.zoneName == e.locality.zone && e.lbEndpointsCount > 0 } ?: false

private fun shouldAddDynamicForwardProxyCluster(group: Group) =
group.proxySettings.outgoing.getDomainPatternDependencies().isNotEmpty()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class EnvoyEndpointsFactory(
) {
companion object {
private val logger by logger()
private const val HIGHEST_PRIORITY = 0
private const val DEFAULT_WEIGHT = 1
}

fun createLoadAssignment(
Expand Down Expand Up @@ -84,22 +86,48 @@ class EnvoyEndpointsFactory(
return if (routeSpec is WeightRouteSpecification) {
ClusterLoadAssignment.newBuilder(loadAssignment)
.clearEndpoints()
.addAllEndpoints(assignWeights(loadAssignment.endpointsList, routeSpec.clusterWeights))
.addAllEndpoints(
assignWeightsAndDuplicateEndpoints(
loadAssignment.endpointsList,
routeSpec.clusterWeights
)
)
.setClusterName(routeSpec.clusterName)
.build()
} else loadAssignment
}

private fun assignWeights(
llbEndpointsList: List<LocalityLbEndpoints>, weights: ZoneWeights
private fun assignWeightsAndDuplicateEndpoints(
llbEndpointsList: List<LocalityLbEndpoints>,
weights: ZoneWeights
): List<LocalityLbEndpoints> {
if (properties.loadBalancing.trafficSplitting.zonesAllowingTrafficSplitting.contains(currentZone)) {
val endpoints = llbEndpointsList
.map {
if (weights.weightByZone.containsKey(it.locality.zone)) {
LocalityLbEndpoints.newBuilder(it)
.setLoadBalancingWeight(
UInt32Value.of(
weights.weightByZone[it.locality.zone] ?: DEFAULT_WEIGHT
)
)
.build()
} else it
}
return overrideTrafficSplittingZoneEndpointsPriority(endpoints) + endpoints
}
return llbEndpointsList
}

private fun overrideTrafficSplittingZoneEndpointsPriority(
endpoints: List<LocalityLbEndpoints>
): List<LocalityLbEndpoints> {
return endpoints
.filter { properties.loadBalancing.trafficSplitting.zoneName == it.locality.zone }
.map {
if (weights.weightByZone.containsKey(it.locality.zone)) {
LocalityLbEndpoints.newBuilder(it)
.setLoadBalancingWeight(UInt32Value.of(weights.weightByZone[it.locality.zone] ?: 0))
.build()
} else it
LocalityLbEndpoints.newBuilder(it)
.setPriority(HIGHEST_PRIORITY)
.build()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ import pl.allegro.tech.servicemesh.envoycontrol.utils.CLUSTER_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_CLUSTER_WEIGHTS
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_DISCOVERY_SERVICE_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_IDLE_TIMEOUT
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_PRIORITY
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_SERVICE_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.EGRESS_HOST
import pl.allegro.tech.servicemesh.envoycontrol.utils.EGRESS_PORT
import pl.allegro.tech.servicemesh.envoycontrol.utils.HIGHEST_PRIORITY
import pl.allegro.tech.servicemesh.envoycontrol.utils.INGRESS_HOST
import pl.allegro.tech.servicemesh.envoycontrol.utils.INGRESS_PORT
import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_PROPERTIES_WITH_WEIGHTS
Expand Down Expand Up @@ -275,13 +277,18 @@ class EnvoySnapshotFactoryTest {
assertThat(it.endpointsList)
.anySatisfy { e ->
e.locality.zone == TRAFFIC_SPLITTING_ZONE &&
e.loadBalancingWeight.value == DEFAULT_CLUSTER_WEIGHTS.weightByZone[TRAFFIC_SPLITTING_ZONE]
e.loadBalancingWeight.value == DEFAULT_CLUSTER_WEIGHTS.weightByZone[TRAFFIC_SPLITTING_ZONE] &&
e.priority == DEFAULT_PRIORITY
}.anySatisfy { e ->
e.locality.zone == TRAFFIC_SPLITTING_ZONE &&
e.loadBalancingWeight.value == DEFAULT_CLUSTER_WEIGHTS.weightByZone[TRAFFIC_SPLITTING_ZONE] &&
e.priority == HIGHEST_PRIORITY
}
.anySatisfy { e ->
e.locality.zone == CURRENT_ZONE &&
e.loadBalancingWeight.value == DEFAULT_CLUSTER_WEIGHTS.weightByZone[CURRENT_ZONE]
}
.hasSize(2)
.hasSize(3)
}
}

Expand Down Expand Up @@ -313,7 +320,7 @@ class EnvoySnapshotFactoryTest {
e.locality.zone == TRAFFIC_SPLITTING_ZONE &&
!e.hasLoadBalancingWeight()
}
.hasSize(2)
.hasSize(3)
}
}

Expand Down Expand Up @@ -457,7 +464,7 @@ class EnvoySnapshotFactoryTest {
CURRENT_ZONE
)
val egressRoutesFactory = EnvoyEgressRoutesFactory(properties)
val clustersFactory = EnvoyClustersFactory(properties)
val clustersFactory = EnvoyClustersFactory(properties, CURRENT_ZONE)
val endpointsFactory = EnvoyEndpointsFactory(properties, ServiceTagMetadataGenerator(), CURRENT_ZONE)
val envoyHttpFilters = EnvoyHttpFilters.defaultFilters(properties)
val listenersFactory = EnvoyListenersFactory(properties, envoyHttpFilters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1312,7 +1312,7 @@ class SnapshotUpdaterTest {
EnvoySnapshotFactory(
ingressRoutesFactory = EnvoyIngressRoutesFactory(snapshotProperties, currentZone = CURRENT_ZONE),
egressRoutesFactory = EnvoyEgressRoutesFactory(snapshotProperties),
clustersFactory = EnvoyClustersFactory(snapshotProperties),
clustersFactory = EnvoyClustersFactory(snapshotProperties, CURRENT_ZONE),
endpointsFactory = EnvoyEndpointsFactory(
snapshotProperties, ServiceTagMetadataGenerator(snapshotProperties.routing.serviceTags), CURRENT_ZONE
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.GlobalSnapshot
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.utils.CLUSTER_NAME1
import pl.allegro.tech.servicemesh.envoycontrol.utils.CLUSTER_NAME2
import pl.allegro.tech.servicemesh.envoycontrol.utils.CURRENT_ZONE
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_CLUSTER_WEIGHTS
import pl.allegro.tech.servicemesh.envoycontrol.utils.DEFAULT_SERVICE_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.TRAFFIC_SPLITTING_ZONE
Expand All @@ -23,7 +24,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.utils.createServicesGroup
internal class EnvoyClustersFactoryTest {

companion object {
private val factory = EnvoyClustersFactory(SnapshotProperties())
private val factory = EnvoyClustersFactory(SnapshotProperties(), CURRENT_ZONE)
private val snapshotPropertiesWithWeights = SnapshotProperties().apply {
loadBalancing.trafficSplitting.weightsByService = mapOf(
DEFAULT_SERVICE_NAME to DEFAULT_CLUSTER_WEIGHTS
Expand Down Expand Up @@ -96,7 +97,7 @@ internal class EnvoyClustersFactoryTest {
@Test
fun `should get cluster with locality weighted config for group clusters`() {
val cluster1 = createCluster(snapshotPropertiesWithWeights, CLUSTER_NAME1)
val factory = EnvoyClustersFactory(snapshotPropertiesWithWeights)
val factory = EnvoyClustersFactory(snapshotPropertiesWithWeights, CURRENT_ZONE)
val result = factory.getClustersForGroup(
createServicesGroup(
snapshotProperties = snapshotPropertiesWithWeights,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.MethodSource
import pl.allegro.tech.servicemesh.envoycontrol.groups.DependencySettings
import pl.allegro.tech.servicemesh.envoycontrol.groups.RoutingPolicy
import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterState
import pl.allegro.tech.servicemesh.envoycontrol.services.Locality
Expand All @@ -19,6 +20,9 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.LoadBalancingPriorityProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.LoadBalancingProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.TrafficSplittingProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.WeightRouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.ZoneWeights
import java.util.concurrent.ConcurrentHashMap
import java.util.stream.Stream

Expand All @@ -33,13 +37,13 @@ internal class EnvoyEndpointsFactoryTest {
),
"DC2" to mapOf(
"DC1" to 1,
"DC2" to 2,
"DC3" to 3
"DC2" to 0,
"DC3" to 2
),
"DC3" to mapOf(
"DC1" to 2,
"DC2" to 3,
"DC3" to 4
"DC1" to 1,
"DC2" to 2,
"DC3" to 0
)
)

Expand All @@ -52,6 +56,7 @@ internal class EnvoyEndpointsFactoryTest {
private val serviceName = "service-one"

private val defaultZone = "DC1"
private val trafficSplittingZone = "DC2"

private val endpointsFactory = EnvoyEndpointsFactory(
SnapshotProperties().apply {
Expand All @@ -77,6 +82,31 @@ internal class EnvoyEndpointsFactoryTest {
)
)

private val defaultZoneWeights = mapOf(
"DC1" to 100,
"DC2" to 10,
"DC3" to 2
)

private val snapshotPropertiesWithWeights = SnapshotProperties().apply {
loadBalancing = LoadBalancingProperties()
.apply {
priorities = LoadBalancingPriorityProperties()
.apply { zonePriorities = dcPriorityProperties }
trafficSplitting = TrafficSplittingProperties()
.apply {
zoneName = trafficSplittingZone
zonesAllowingTrafficSplitting = listOf("DC1")
weightsByService = mapOf(
serviceName to ZoneWeights()
.apply {
weightByZone = defaultZoneWeights
}
)
}
}
}

// language=json
private val globalLoadAssignmentJson = """{
"cluster_name": "lorem-service",
Expand Down Expand Up @@ -354,6 +384,28 @@ internal class EnvoyEndpointsFactoryTest {
)
}

@Test
fun `should override priority and duplicate endpoints for traffic splitting zone`() {
val envoyEndpointsFactory = EnvoyEndpointsFactory(
snapshotPropertiesWithWeights,
currentZone = "DC1"
)
val loadAssignments = envoyEndpointsFactory.createLoadAssignment(setOf(serviceName), multiClusterStateDC1Local)
var resultLoadAssignment = envoyEndpointsFactory.assignLocalityWeights(
WeightRouteSpecification(
serviceName, listOf(), DependencySettings(), ZoneWeights().apply { weightByZone = defaultZoneWeights }
),
loadAssignments[0]
)

assertThat(resultLoadAssignment.endpointsList).hasSize(dcPriorityProperties.size + 1)
assertThat(resultLoadAssignment.endpointsList)
.anySatisfy { it.hasZoneWithPriority("DC2", 1) }
.anySatisfy { it.hasZoneWithPriority("DC2", 0) }
.anySatisfy { it.hasZoneWithPriority("DC1", 0) }
.anySatisfy { it.hasZoneWithPriority("DC3", 2) }
}

private fun List<ClusterLoadAssignment>.assertHasLoadAssignment(map: Map<String, Int>) {
assertThat(this)
.isNotEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ fun createEndpoint(zone: String): LocalityLbEndpoints {
.build()
)
.addAllLbEndpoints(listOf(LbEndpoint.getDefaultInstance()))
.setPriority(DEFAULT_PRIORITY)
.build()
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const val CLUSTER_NAME1 = "cluster-1"
const val CLUSTER_NAME2 = "cluster-2"
const val TRAFFIC_SPLITTING_ZONE = "dc2"
const val CURRENT_ZONE = "dc1"
const val DEFAULT_PRIORITY = 1
const val HIGHEST_PRIORITY = 0

val DEFAULT_CLUSTER_WEIGHTS = zoneWeights(mapOf(CURRENT_ZONE to 60, TRAFFIC_SPLITTING_ZONE to 40))

Expand All @@ -24,6 +26,7 @@ val SNAPSHOT_PROPERTIES_WITH_WEIGHTS = SnapshotProperties().also {
DEFAULT_SERVICE_NAME to DEFAULT_CLUSTER_WEIGHTS
)
it.loadBalancing.trafficSplitting.zoneName = TRAFFIC_SPLITTING_ZONE
it.loadBalancing.trafficSplitting.zonesAllowingTrafficSplitting = listOf(CURRENT_ZONE)
}

fun zoneWeights(weightByZone: Map<String, Int>) = ZoneWeights().also {
Expand Down
Loading

0 comments on commit ced0295

Please sign in to comment.