Skip to content

Commit

Permalink
added postfix, added service name
Browse files Browse the repository at this point in the history
Added logs
Ignored failing test #292
  • Loading branch information
nastassia-dailidava committed Sep 25, 2023
1 parent 2b62bc7 commit 141574d
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class EnvoySnapshotFactory(
globalSnapshot: GlobalSnapshot
): Collection<RouteSpecification> {
val definedServicesRoutes = group.proxySettings.outgoing.getServiceDependencies().map {
getTrafficSplittingRouteSpecification(
buildRouteSpecification(
clusterName = it.service,
routeDomains = listOf(it.service) + getServiceWithCustomDomain(it.service),
settings = it.settings,
Expand All @@ -203,7 +203,7 @@ class EnvoySnapshotFactory(
is AllServicesGroup -> {
val servicesNames = group.proxySettings.outgoing.getServiceDependencies().map { it.service }.toSet()
val allServicesRoutes = globalSnapshot.allServicesNames.subtract(servicesNames).map {
getTrafficSplittingRouteSpecification(
buildRouteSpecification(
clusterName = it,
routeDomains = listOf(it) + getServiceWithCustomDomain(it),
settings = group.proxySettings.outgoing.defaultServiceSettings,
Expand All @@ -216,7 +216,7 @@ class EnvoySnapshotFactory(
}
}

private fun getTrafficSplittingRouteSpecification(
private fun buildRouteSpecification(
clusterName: String,
routeDomains: List<String>,
settings: DependencySettings,
Expand All @@ -233,6 +233,10 @@ class EnvoySnapshotFactory(
"serviceName: $serviceName, clusterName: $clusterName"
)
return if (weights != null && enabledForDependency) {
logger.debug(
"Building traffic splitting route spec, weights: $weights, " +
"serviceName: $serviceName, clusterName: $clusterName, "
)
WeightRouteSpecification(
clusterName,
routeDomains,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ class CanaryProperties {
class TrafficSplittingProperties {
var zoneName = ""
var serviceByWeightsProperties: Map<String, ZoneWeights> = mapOf()
var secondaryClusterPostfix = "secondary"
var aggregateClusterPostfix = "aggregate"
}

class ZoneWeights {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,6 @@ class EnvoyClustersFactory(

companion object {
private val logger by logger()
const val SECONDARY_CLUSTER_POSTFIX = "secondary"
const val AGGREGATE_CLUSTER_POSTFIX = "aggregate"

@JvmStatic
fun getSecondaryClusterName(serviceName: String): String {
return "$serviceName-$SECONDARY_CLUSTER_POSTFIX"
}

@JvmStatic
fun getAggregateClusterName(serviceName: String): String {
return "$serviceName-$AGGREGATE_CLUSTER_POSTFIX"
}
}

fun getClustersForServices(
Expand Down Expand Up @@ -253,6 +241,10 @@ class EnvoyClustersFactory(
return Cluster.newBuilder(cluster)
.setCommonHttpProtocolOptions(HttpProtocolOptions.newBuilder().setIdleTimeout(idleTimeoutPolicy))
.setName(clusterName)
.setEdsClusterConfig(
Cluster.EdsClusterConfig.newBuilder(cluster.edsClusterConfig)
.setServiceName(clusterName)
)
.build()
}

Expand All @@ -264,15 +256,13 @@ class EnvoyClustersFactory(
val secondaryCluster = createClusterForGroup(
dependencySettings,
cluster,
getSecondaryClusterName(cluster.name)
"${cluster.name}-${properties.loadBalancing.trafficSplitting.secondaryClusterPostfix}"
)
val aggregateCluster =
createAggregateCluster(mainCluster.name, linkedSetOf(secondaryCluster.name, mainCluster.name))
return listOf(mainCluster, secondaryCluster, aggregateCluster)
.also {
it.forEach { cl ->
logger.debug("Created traffic splitting cluster config with cluster name: {}", cl.name)
}
.onEach {
logger.debug("Created set of cluster configs for traffic splitting: {}", it.toString())
}
}

Expand All @@ -284,9 +274,6 @@ class EnvoyClustersFactory(
): Collection<Cluster> {
return cluster?.let {
if (enableTrafficSplitting(serviceName, clusterLoadAssignment)) {
logger.debug(
"Creating traffic splitting egress cluster config for ${cluster.name}, service: $serviceName"
)
createSetOfClustersForGroup(dependencySettings, cluster)
} else {
listOf(createClusterForGroup(dependencySettings, cluster))
Expand Down Expand Up @@ -362,7 +349,7 @@ class EnvoyClustersFactory(

private fun createAggregateCluster(clusterName: String, aggregatedClusters: Collection<String>): Cluster {
return Cluster.newBuilder()
.setName(getAggregateClusterName(clusterName))
.setName("$clusterName-${properties.loadBalancing.trafficSplitting.aggregateClusterPostfix}")
.setConnectTimeout(Durations.fromMillis(properties.edsConnectionTimeout.toMillis()))
.setLbPolicy(Cluster.LbPolicy.CLUSTER_PROVIDED)
.setClusterType(
Expand Down Expand Up @@ -496,7 +483,7 @@ class EnvoyClustersFactory(
)
)
}
)
).setServiceName(clusterConfiguration.serviceName)
)
.setLbPolicy(properties.loadBalancing.policy)
// TODO: if we want to have multiple memory-backend instances of ratelimit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstances
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.RouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.WeightRouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.clusters.EnvoyClustersFactory.Companion.getSecondaryClusterName
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.ServiceTagMetadataGenerator

typealias EnvoyProxyLocality = io.envoyproxy.envoy.config.core.v3.Locality
Expand Down Expand Up @@ -91,7 +90,10 @@ class EnvoyEndpointsFactory(
.addAllEndpoints(assignment.endpointsList?.filter { e ->
e.locality.zone == properties.loadBalancing.trafficSplitting.zoneName
})
.setClusterName(getSecondaryClusterName(routeSpec.clusterName))
.setClusterName(
"${routeSpec.clusterName}-" + properties.loadBalancing
.trafficSplitting.secondaryClusterPostfix
)
.build()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.RouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.StandardRouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.WeightRouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.clusters.EnvoyClustersFactory.Companion.getSecondaryClusterName
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.ServiceTagFilterFactory
import pl.allegro.tech.servicemesh.envoycontrol.groups.RetryPolicy as EnvoyControlRetryPolicy

Expand Down Expand Up @@ -358,7 +357,8 @@ class EnvoyEgressRoutesFactory(
WeightedCluster.newBuilder()
.withClusterWeight(routeSpec.clusterName, routeSpec.clusterWeights.main)
.withClusterWeight(
getSecondaryClusterName(routeSpec.clusterName),
"${routeSpec.clusterName}-" + properties.loadBalancing.trafficSplitting
.aggregateClusterPostfix,
routeSpec.clusterWeights.secondary
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ class ThreadPoolMetricTest {

// then
val allMeterNames = meterRegistry.meters.map { it.id.name }
val requiredMeterNames = listOf("grpc-server-worker", "grpc-worker-event-loop", "snapshot-update", "group-snapshot").flatMap {
listOf("$it.executor.completed", "$it.executor.active", "$it.executor.queued", "$it.executor.pool.size")
}
val requiredMeterNames =
listOf("grpc-server-worker", "grpc-worker-event-loop", "snapshot-update", "group-snapshot").flatMap {
listOf("$it.executor.completed", "$it.executor.active", "$it.executor.queued", "$it.executor.pool.size")
}

assertThat(allMeterNames).containsAll(requiredMeterNames)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ internal class EnvoyClustersFactoryTest {
}
.anySatisfy {
assertThat(it.name).isEqualTo(SECONDARY_CLUSTER_NAME)
assertThat(it.edsClusterConfig).isEqualTo(cluster1.edsClusterConfig)
}
.anySatisfy {
assertThat(it.name).isEqualTo(AGGREGATE_CLUSTER_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ fun createCluster(
.setType(Cluster.DiscoveryType.EDS)
.setConnectTimeout(Durations.fromMillis(defaultProperties.edsConnectionTimeout.toMillis()))
.setEdsClusterConfig(
Cluster.EdsClusterConfig.newBuilder().setEdsConfig(
ConfigSource.newBuilder().setAds(AggregatedConfigSource.newBuilder())
)
Cluster.EdsClusterConfig.newBuilder()
.setEdsConfig(
ConfigSource.newBuilder().setAds(
AggregatedConfigSource.newBuilder()
)
).setServiceName(clusterName)
)
.setLbPolicy(defaultProperties.loadBalancing.policy)
.setCommonHttpProtocolOptions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,20 @@ fun EnvoyExtension.callUpstreamServiceRepeatedly(
)
return stats
}

fun EnvoyExtension.callUpstreamServiceRepeatedly(
vararg services: EchoServiceExtension,
numberOfCalls: Int = 100,
tag: String?
): CallStats {
val stats = CallStats(services.asList())
this.egressOperations.callServiceRepeatedly(
service = upstreamServiceName,
stats = stats,
minRepeat = numberOfCalls,
maxRepeat = numberOfCalls,
repeatUntil = { true },
headers = tag?.let { mapOf("x-service-tag" to it) } ?: emptyMap(),
)
return stats
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,19 @@ class WeightedClustersRoutingTest {
.verifyCallsCountCloseTo(upstreamServiceDC1, 90)
.verifyCallsCountGreaterThan(upstreamServiceDC2, 1)
}

@Test
fun `should route traffic according to weights with service tag`() {
consul.serverFirst.operations.registerServiceWithEnvoyOnEgress(echoEnvoyDC1, name = serviceName)

consul.serverFirst.operations.registerService(upstreamServiceDC1, name = upstreamServiceName, tags = listOf("tag"))
echoEnvoyDC1.verifyIsReachable(upstreamServiceDC1, upstreamServiceName)

consul.serverSecond.operations.registerService(upstreamServiceDC2, name = upstreamServiceName, tags = listOf("tag"))
echoEnvoyDC1.verifyIsReachable(upstreamServiceDC2, upstreamServiceName)

echoEnvoyDC1.callUpstreamServiceRepeatedly(upstreamServiceDC1, upstreamServiceDC2, tag = "tag")
.verifyCallsCountCloseTo(upstreamServiceDC1, 90)
.verifyCallsCountGreaterThan(upstreamServiceDC2, 1)
}
}

0 comments on commit 141574d

Please sign in to comment.