Skip to content

Commit

Permalink
Implemented possibility for configuring traffic splitting, and fallba…
Browse files Browse the repository at this point in the history
…ck using aggregate cluster| fixed tests #292
  • Loading branch information
nastassia-dailidava committed Aug 26, 2023
1 parent 919cff0 commit a3aed17
Show file tree
Hide file tree
Showing 12 changed files with 81 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class EnvoySnapshotFactory(
clusterName = it.getClusterName(),
routeDomains = listOf(it.getRouteDomain()),
settings = it.settings,
clusterWeights = ClusterWeights()
clusterWeights = mapOf()
)
}
)
Expand All @@ -179,7 +179,7 @@ class EnvoySnapshotFactory(
clusterName = properties.dynamicForwardProxy.clusterName,
routeDomains = group.proxySettings.outgoing.getDomainPatternDependencies().map { it.domainPattern },
settings = group.proxySettings.outgoing.defaultServiceSettings,
clusterWeights = ClusterWeights()
clusterWeights = mapOf()
)
}

Expand Down Expand Up @@ -218,13 +218,13 @@ class EnvoySnapshotFactory(
serviceName: String,
globalSnapshot: GlobalSnapshot,
dependencyServiceName: String
): ClusterWeights {
): Map<String, Int> {
val trafficSplitting = properties.loadBalancing.trafficSplitting
val weights = trafficSplitting.serviceByWeightsProperties[serviceName]
val weights = trafficSplitting.serviceByWeightsProperties[serviceName] ?: mapOf()
val enabledForDependency = globalSnapshot.endpoints[dependencyServiceName]?.endpointsList
?.any { e -> trafficSplitting.zoneName == e.locality.zone }
?: false
return if (enabledForDependency && weights != null) weights else ClusterWeights()
return if (enabledForDependency) weights else mapOf()
}

private fun getServiceWithCustomDomain(it: String): List<String> {
Expand Down Expand Up @@ -400,5 +400,5 @@ class RouteSpecification(
val clusterName: String,
val routeDomains: List<String>,
val settings: DependencySettings,
val clusterWeights: ClusterWeights = ClusterWeights()
val clusterWeights: Map<String, Int> = mapOf()
)
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,7 @@ class CanaryProperties {

class TrafficSplittingProperties {
var zoneName = ""
var serviceByWeightsProperties: Map<String, ClusterWeights> = mapOf()
}

class ClusterWeights {
var mainClusterWeight: Int = 0
var secondaryClusterWeight: Int = 0
var serviceByWeightsProperties: Map<String, Map<String, Int>> = mapOf()
}

class LoadBalancingWeightsProperties {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,12 @@ class EnvoyClustersFactory(
const val SECONDARY_CLUSTER_POSTFIX = "secondary"
const val AGGREGATE_CLUSTER_POSTFIX = "aggregate"

@JvmStatic
fun getSecondaryClusterName(serviceName: String): String {
return "$serviceName-$SECONDARY_CLUSTER_POSTFIX"
}

@JvmStatic
fun getAggregateClusterName(serviceName: String): String {
return "$serviceName-$AGGREGATE_CLUSTER_POSTFIX"
}
Expand Down Expand Up @@ -266,7 +268,7 @@ class EnvoyClustersFactory(
getSecondaryClusterName(cluster.name)
)
val aggregateCluster =
createAggregateCluster(mainCluster.name, listOf(secondaryCluster.name, mainCluster.name))
createAggregateCluster(mainCluster.name, linkedSetOf(secondaryCluster.name, mainCluster.name))
return listOf(mainCluster, secondaryCluster, aggregateCluster)
.also { logger.debug("Created traffic splitting clusters: {}", it) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class EnvoyEndpointsFactory(
egressRouteSpecifications: Collection<RouteSpecification>
): List<ClusterLoadAssignment> {
return egressRouteSpecifications
.filter { it.clusterWeights.mainClusterWeight > 0 && it.clusterWeights.secondaryClusterWeight > 0 }
.filter { it.clusterWeights.isNotEmpty() }
.onEach { logger.debug("Traffic splitting is enabled for cluster: ${it.clusterName}") }
.mapNotNull { routeSpec ->
clusterLoadAssignments[routeSpec.clusterName]?.let { assignment ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,23 +345,21 @@ class EnvoyEgressRoutesFactory(
}

private fun RouteAction.Builder.setCluster(routeSpec: RouteSpecification): RouteAction.Builder {
val clusterWeights = routeSpec.clusterWeights
val hasWeightsConfig = clusterWeights.mainClusterWeight > 0 &&
clusterWeights.secondaryClusterWeight > 0
val hasWeightsConfig = routeSpec.clusterWeights.keys.containsAll(listOf("main", "secondary"))
return if (!hasWeightsConfig) {
this.setCluster(routeSpec.clusterName)
} else {
logger.debug(
"Creating weighted cluster configuration for route spec {}, {}",
routeSpec.clusterName,
clusterWeights
routeSpec.clusterWeights
)
this.setWeightedClusters(
WeightedCluster.newBuilder()
.withClusterWeight(routeSpec.clusterName, clusterWeights.mainClusterWeight)
.withClusterWeight(routeSpec.clusterName, routeSpec.clusterWeights["main"]!!)
.withClusterWeight(
getSecondaryClusterName(routeSpec.clusterName),
clusterWeights.secondaryClusterWeight
routeSpec.clusterWeights["secondary"]!!
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,35 +33,35 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.EnvoyEg
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.EnvoyIngressRoutesFactory
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.ServiceTagMetadataGenerator
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.serviceDependencies
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.AGGREGATE_CLUSTER_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.CLUSTER_NAME1
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.CLUSTER_NAME2
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.CLUSTER_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.CURRENT_ZONE
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.DEFAULT_CLUSTER_WEIGHTS
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.DEFAULT_DISCOVERY_SERVICE_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.DEFAULT_IDLE_TIMEOUT
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.DEFAULT_SERVICE_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.EGRESS_HOST
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.EGRESS_PORT
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.INGRESS_HOST
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.INGRESS_PORT
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.MAIN_CLUSTER_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.SECONDARY_CLUSTER_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.TRAFFIC_SPLITTING_FORCE_TRAFFIC_ZONE
import pl.allegro.tech.servicemesh.envoycontrol.utils.createCluster
import pl.allegro.tech.servicemesh.envoycontrol.utils.createClusterConfigurations
import pl.allegro.tech.servicemesh.envoycontrol.utils.createLoadAssignments
import pl.allegro.tech.servicemesh.envoycontrol.utils.createEndpoints

class EnvoySnapshotFactoryTest {
companion object {
const val CLUSTER_NAME = "cluster-name"
const val DEFAULT_IDLE_TIMEOUT = 100L
const val CURRENT_ZONE = "dc1"
const val FORCE_TRAFFIC_ZONE = "dc2"
const val MAIN_CLUSTER_NAME = "service-name-2"
const val SECONDARY_CLUSTER_NAME = "service-name-2-secondary"
const val AGGREGATE_CLUSTER_NAME = "service-name-2-aggregate"
const val SERVICE_NAME_2 = "service-name-2"
}

private val defaultClusterWeights = mapOf("main" to 50, "secondary" to 50)
private val snapshotPropertiesWithWeights = SnapshotProperties().also {
it.loadBalancing.trafficSplitting.serviceByWeightsProperties = mapOf(
DEFAULT_SERVICE_NAME to DEFAULT_CLUSTER_WEIGHTS
)
it.loadBalancing.trafficSplitting.zoneName = FORCE_TRAFFIC_ZONE
it.loadBalancing.trafficSplitting.zoneName = TRAFFIC_SPLITTING_FORCE_TRAFFIC_ZONE
}

@Test
Expand Down Expand Up @@ -230,13 +230,14 @@ class EnvoySnapshotFactoryTest {
}

@Test
fun `should create traffic splitting configuration`() {
fun `should create weighted snapshot clusters`() {
// given
val envoySnapshotFactory = createSnapshotFactory(snapshotPropertiesWithWeights)
val cluster1 = createCluster(snapshotPropertiesWithWeights, clusterName = CLUSTER_NAME1)
val cluster2 = createCluster(snapshotPropertiesWithWeights, clusterName = CLUSTER_NAME2)
val cluster1 = createCluster(snapshotPropertiesWithWeights, clusterName = DEFAULT_SERVICE_NAME)
val cluster2 =
createCluster(snapshotPropertiesWithWeights, clusterName = SERVICE_NAME_2)
val group: Group = createServicesGroup(
dependencies = arrayOf(cluster1.name to null),
dependencies = arrayOf(cluster2.name to null),
snapshotProperties = snapshotPropertiesWithWeights
)
val globalSnapshot = createGlobalSnapshot(cluster1, cluster2)
Expand All @@ -254,29 +255,30 @@ class EnvoySnapshotFactoryTest {
assertThat(it.clusterName).isEqualTo(MAIN_CLUSTER_NAME)
assertThat(it.endpointsList)
.anyMatch { e -> e.locality.zone == CURRENT_ZONE }
.anyMatch { e -> e.locality.zone == FORCE_TRAFFIC_ZONE }
.anyMatch { e -> e.locality.zone == TRAFFIC_SPLITTING_FORCE_TRAFFIC_ZONE }
}
.anySatisfy {
assertThat(it.clusterName).isEqualTo(SECONDARY_CLUSTER_NAME)
assertThat(it.endpointsList)
.allMatch { e -> e.locality.zone == FORCE_TRAFFIC_ZONE }
.allMatch { e -> e.locality.zone == TRAFFIC_SPLITTING_FORCE_TRAFFIC_ZONE }
}
}

@Test
fun `should not create traffic splitting configuration when zone condition isn't complied`() {
fun `should get regular snapshot clusters when traffic splitting zone condition isn't complied`() {
// given
val defaultProperties = SnapshotProperties().also {
it.dynamicListeners.enabled = false
it.loadBalancing.trafficSplitting.serviceByWeightsProperties = mapOf(
DEFAULT_SERVICE_NAME to DEFAULT_CLUSTER_WEIGHTS
DEFAULT_SERVICE_NAME to defaultClusterWeights
)
it.loadBalancing.trafficSplitting.zoneName = "not-matching-dc"
}
val envoySnapshotFactory = createSnapshotFactory(defaultProperties)
val cluster1 = createCluster(defaultProperties, clusterName = CLUSTER_NAME1)
val cluster2 = createCluster(defaultProperties, clusterName = CLUSTER_NAME2)
val cluster1 = createCluster(defaultProperties, clusterName = DEFAULT_SERVICE_NAME)
val cluster2 = createCluster(defaultProperties, clusterName = SERVICE_NAME_2)
val group: Group = createServicesGroup(
dependencies = arrayOf(cluster1.name to null),
dependencies = arrayOf(cluster2.name to null),
snapshotProperties = defaultProperties
)
val globalSnapshot = createGlobalSnapshot(cluster1, cluster2)
Expand All @@ -292,11 +294,11 @@ class EnvoySnapshotFactoryTest {
}

@Test
fun `should create traffic splitting configuration for wildcard dependencies`() {
fun `should create weighted snapshot clusters for wildcard dependencies`() {
// given
val envoySnapshotFactory = createSnapshotFactory(snapshotPropertiesWithWeights)
val cluster1 = createCluster(snapshotPropertiesWithWeights, clusterName = CLUSTER_NAME1)
val cluster2 = createCluster(snapshotPropertiesWithWeights, clusterName = CLUSTER_NAME2)
val cluster1 = createCluster(snapshotPropertiesWithWeights, clusterName = DEFAULT_SERVICE_NAME)
val cluster2 = createCluster(snapshotPropertiesWithWeights, clusterName = SERVICE_NAME_2)
val wildcardTimeoutPolicy = outgoingTimeoutPolicy(connectionIdleTimeout = 12)

val group: Group = createAllServicesGroup(
Expand All @@ -311,14 +313,9 @@ class EnvoySnapshotFactoryTest {

// then
assertThat(snapshot.clusters().resources())
.containsKeys(
MAIN_CLUSTER_NAME,
SECONDARY_CLUSTER_NAME,
AGGREGATE_CLUSTER_NAME,
CLUSTER_NAME2,
"cluster-2-secondary",
"cluster-2-aggregate"
)
.containsKey(MAIN_CLUSTER_NAME)
.containsKey(SECONDARY_CLUSTER_NAME)
.containsKey(AGGREGATE_CLUSTER_NAME)
}

@Test
Expand Down Expand Up @@ -477,17 +474,23 @@ class EnvoySnapshotFactoryTest {
)
}

private fun createGlobalSnapshot(
vararg clusters: Cluster,
securedClusters: List<Cluster> = clusters.asList()
): GlobalSnapshot {
private fun createGlobalSnapshot(vararg clusters: Cluster): GlobalSnapshot {
return GlobalSnapshot(
SnapshotResources.create<Cluster>(clusters.toList(), "pl/allegro/tech/servicemesh/envoycontrol/v3")
.resources(),
clusters.map { it.name }.toSet(),
SnapshotResources.create<ClusterLoadAssignment>(createLoadAssignments(clusters.toList()), "v1").resources(),
createClusterConfigurations(),
SnapshotResources.create<Cluster>(securedClusters, "v3").resources()
SnapshotResources.create<Cluster>(clusters.toList(), "v3").resources()
)
}

private fun createLoadAssignments(clusters: List<Cluster>): List<ClusterLoadAssignment> {
return clusters.map {
ClusterLoadAssignment.newBuilder()
.setClusterName(it.name)
.addAllEndpoints(createEndpoints())
.build()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import org.junit.jupiter.api.Test
import pl.allegro.tech.servicemesh.envoycontrol.groups.DependencySettings
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.GlobalSnapshot
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.AGGREGATE_CLUSTER_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.CLUSTER_NAME1
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.CLUSTER_NAME2
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.DEFAULT_CLUSTER_WEIGHTS
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.DEFAULT_SERVICE_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.MAIN_CLUSTER_NAME
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.SECONDARY_CLUSTER_NAME
Expand All @@ -29,7 +29,7 @@ internal class EnvoyClustersFactoryTest {
private val factory = EnvoyClustersFactory(SnapshotProperties())
private val snapshotPropertiesWithWeights = SnapshotProperties().apply {
loadBalancing.trafficSplitting.serviceByWeightsProperties = mapOf(
DEFAULT_SERVICE_NAME to DEFAULT_CLUSTER_WEIGHTS
DEFAULT_SERVICE_NAME to TestData.DEFAULT_CLUSTER_WEIGHTS
)
loadBalancing.trafficSplitting.zoneName = TRAFFIC_SPLITTING_FORCE_TRAFFIC_ZONE
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstance
import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstances
import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceName
import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.ClusterWeights
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.LoadBalancingPriorityProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.LoadBalancingProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.RouteSpecification
Expand Down Expand Up @@ -55,14 +54,11 @@ internal class EnvoyEndpointsFactoryTest {

private val serviceName = "service-one"

private val serviceName2 = "service-two"

private val secondaryClusterName = "service-one-secondary"

private val defaultWeights = ClusterWeights().apply {
mainClusterWeight = 50
secondaryClusterWeight = 50
}
private val serviceName2 = "service-two"

private val defaultWeights = mapOf("main" to 50, "secondary" to 50)

private val defaultZone = "DC1"

Expand Down Expand Up @@ -541,7 +537,7 @@ internal class EnvoyEndpointsFactoryTest {
}

private fun snapshotPropertiesWithTrafficSplitting(
serviceByWeights: Map<String, ClusterWeights>,
serviceByWeights: Map<String, Map<String, Int>>,
zone: String = defaultZone
) =
SnapshotProperties().apply {
Expand All @@ -551,7 +547,7 @@ internal class EnvoyEndpointsFactoryTest {
}
}

private fun String.toRouteSpecification(weights: ClusterWeights = defaultWeights): RouteSpecification {
private fun String.toRouteSpecification(weights: Map<String, Int> = defaultWeights): RouteSpecification {
return RouteSpecification(this, listOf(), DependencySettings(), weights)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ import io.envoyproxy.envoy.config.cluster.v3.Cluster
import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource
import io.envoyproxy.envoy.config.core.v3.ConfigSource
import io.envoyproxy.envoy.config.core.v3.HttpProtocolOptions
import pl.allegro.tech.servicemesh.envoycontrol.EnvoySnapshotFactoryTest.Companion.CLUSTER_NAME
import pl.allegro.tech.servicemesh.envoycontrol.EnvoySnapshotFactoryTest.Companion.DEFAULT_IDLE_TIMEOUT
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.ClusterConfiguration
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.CLUSTER_NAME

fun createCluster(
defaultProperties: SnapshotProperties = SnapshotProperties(),
clusterName: String = CLUSTER_NAME,
idleTimeout: Long = DEFAULT_IDLE_TIMEOUT
idleTimeout: Long = TestData.DEFAULT_IDLE_TIMEOUT
): Cluster {
return Cluster.newBuilder().setName(clusterName)
.setType(Cluster.DiscoveryType.EDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import io.envoyproxy.envoy.config.cluster.v3.Cluster
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment
import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint
import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints
import pl.allegro.tech.servicemesh.envoycontrol.EnvoySnapshotFactoryTest
import pl.allegro.tech.servicemesh.envoycontrol.utils.TestData.CURRENT_ZONE

fun createLoadAssignments(clusters: List<Cluster>): List<ClusterLoadAssignment> {
return clusters.map {
Expand All @@ -17,8 +17,8 @@ fun createLoadAssignments(clusters: List<Cluster>): List<ClusterLoadAssignment>

fun createEndpoints(): List<LocalityLbEndpoints> =
listOf(
createEndpoint(EnvoySnapshotFactoryTest.CURRENT_ZONE),
createEndpoint(EnvoySnapshotFactoryTest.FORCE_TRAFFIC_ZONE)
createEndpoint(CURRENT_ZONE),
createEndpoint(TestData.TRAFFIC_SPLITTING_FORCE_TRAFFIC_ZONE)
)

fun createEndpoint(zone: String): LocalityLbEndpoints {
Expand Down
Loading

0 comments on commit a3aed17

Please sign in to comment.