Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize bucket level monitor to resolve alias to query only those time-series indices that contain docs within timeframe of range query filter in search input #1701

Merged
merged 4 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,17 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerSettings(settings)
.registerThreadPool(threadPool)
.registerAlertIndices(alertIndices)
.registerInputService(InputService(client, scriptService, namedWriteableRegistry, xContentRegistry, clusterService, settings))
.registerInputService(
InputService(
client,
scriptService,
namedWriteableRegistry,
xContentRegistry,
clusterService,
settings,
indexNameExpressionResolver
)
)
.registerTriggerService(triggerService)
.registerAlertService(alertService)
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
Expand Down
125 changes: 123 additions & 2 deletions alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AggregationQueryRewriter
import org.opensearch.alerting.util.CrossClusterMonitorUtils
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.addUserBackendRolesFilter
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap
import org.opensearch.alerting.util.getRoleFilterEnabled
import org.opensearch.alerting.util.use
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.io.stream.BytesStreamOutput
Expand All @@ -40,12 +42,14 @@ import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.MatchQueryBuilder
import org.opensearch.index.query.QueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.index.query.RangeQueryBuilder
import org.opensearch.index.query.TermsQueryBuilder
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.script.ScriptType
import org.opensearch.script.TemplateScript
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Duration
import java.time.Instant

/** Service that handles the collection of input results for Monitor executions */
Expand All @@ -55,7 +59,8 @@ class InputService(
val namedWriteableRegistry: NamedWriteableRegistry,
val xContentRegistry: NamedXContentRegistry,
val clusterService: ClusterService,
val settings: Settings
val settings: Settings,
val indexNameExpressionResolver: IndexNameExpressionResolver
) {

private val logger = LogManager.getLogger(InputService::class.java)
Expand Down Expand Up @@ -245,8 +250,9 @@ class InputService(
.execute()

val indexes = CrossClusterMonitorUtils.parseIndexesForRemoteSearch(searchInput.indices, clusterService)
val resolvedIndexes = resolveOnlyQueryableIndicesFromLocalClusterAliases(monitor, periodEnd, searchInput.query.query(), indexes)
val searchRequest = SearchRequest()
.indices(*indexes.toTypedArray())
.indices(*resolvedIndexes.toTypedArray())
.preference(Preference.PRIMARY_FIRST.type())

XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use {
Expand All @@ -256,6 +262,72 @@ class InputService(
return searchRequest
}

/**
* Resolves concrete indices from aliases based on a time range query and availability in the local cluster.
*
* <p>If an index passed to OpenSearch is an alias, this method will only select those indices
* resolved from the alias that meet the following criteria:
*
* <ol>
* <li>The index's creation date falls within the time range specified in the query's timestamp field.</li>
* <li>The index immediately preceding the time range in terms of creation date is also included.</li>
* </ol>
*
* <p>This ensures that queries targeting aliases consider relevant indices based on their creation time,
* including the one immediately before the specified range to account for potential data at the boundary.
*/
private fun resolveOnlyQueryableIndicesFromLocalClusterAliases(
monitor: Monitor,
periodEnd: Instant,
query: QueryBuilder,
indexes: List<String>,
): List<String> {
val resolvedIndexes = ArrayList<String>()
indexes.forEach {
// we don't optimize for remote cluster aliases. we directly pass them to search request
if (CrossClusterMonitorUtils.isRemoteClusterIndex(it, clusterService))
resolvedIndexes.add(it)
else {
val state = clusterService.state()
if (IndexUtils.isAlias(it, state)) {
val resolveStartTimeOfQueryTimeRange = resolveStartTimeofQueryTimeRange(monitor, query, periodEnd)
if (resolveStartTimeOfQueryTimeRange != null) {
val indices = IndexUtils.resolveAllIndices(listOf(it), clusterService, indexNameExpressionResolver)
val sortedIndices = indices
.mapNotNull { state.metadata().index(it) } // Get IndexMetadata for each index
.sortedBy { it.creationDate } // Sort by creation date

var includePrevious = true
for (i in sortedIndices.indices) {
val indexMetadata = sortedIndices[i]
val creationDate = indexMetadata.creationDate

if (creationDate >= resolveStartTimeOfQueryTimeRange.toEpochMilli()) {
resolvedIndexes.add(indexMetadata.index.name)
includePrevious = false // No need to include previous anymore
} else if (
includePrevious && (
i == sortedIndices.lastIndex ||
sortedIndices[i + 1].creationDate >= resolveStartTimeOfQueryTimeRange.toEpochMilli()
)
) {
// Include the index immediately before the timestamp
resolvedIndexes.add(indexMetadata.index.name)
includePrevious = false
}
}
} else {
// add alias without optimizing for resolve indices
resolvedIndexes.add(it)
}
} else {
resolvedIndexes.add(it)
}
}
}
return resolvedIndexes
}

private suspend fun handleClusterMetricsInput(input: ClusterMetricsInput): MutableList<Map<String, Any>> {
logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType)

Expand Down Expand Up @@ -289,4 +361,53 @@ class InputService(
}
return results
}

fun resolveStartTimeofQueryTimeRange(monitor: Monitor, query: QueryBuilder, periodEnd: Instant): Instant? {
try {
val rangeQuery = findRangeQuery(query) ?: return null
AWSHurneyt marked this conversation as resolved.
Show resolved Hide resolved
val searchParameter = rangeQuery.from().toString() // we are looking for 'timeframe' variable {{period_end}}||-<timeframe>

val timeframeString = searchParameter.substringAfter("||-")
val timeframeRegex = Regex("(\\d+)([a-zA-Z]+)")
val matchResult = timeframeRegex.find(timeframeString)
val (amount, unit) = matchResult?.destructured?.let { (a, u) -> a to u }
?: throw IllegalArgumentException("Invalid timeframe format: $timeframeString")
val duration = when (unit) {
"s" -> Duration.ofSeconds(amount.toLong())
"m" -> Duration.ofMinutes(amount.toLong())
"h" -> Duration.ofHours(amount.toLong())
"d" -> Duration.ofDays(amount.toLong())
else -> throw IllegalArgumentException("Invalid time unit: $unit")
}

return periodEnd.minus(duration)
} catch (e: Exception) {
logger.error(
"Monitor ${monitor.id}:" +
" Failed to resolve time frame of search query while optimizing to query only on few of alias' concrete indices",
e
)
return null // won't do optimization as we failed to resolve the timeframe due to unexpected error
}
}

private fun findRangeQuery(queryBuilder: QueryBuilder?): RangeQueryBuilder? {
if (queryBuilder == null) return null
if (queryBuilder is RangeQueryBuilder) return queryBuilder

if (queryBuilder is BoolQueryBuilder) {
for (clause in queryBuilder.must()) {
val rangeQuery = findRangeQuery(clause)
if (rangeQuery != null) return rangeQuery
}
for (clause in queryBuilder.should()) {
val rangeQuery = findRangeQuery(clause)
if (rangeQuery != null) return rangeQuery
}
// You can also check queryBuilder.filter() and queryBuilder.mustNot() if needed
}

// Add handling for other query types if necessary (e.g., NestedQueryBuilder, etc.)
return null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,5 +227,10 @@ class CrossClusterMonitorUtils {
return if (clusterName.isNotEmpty()) "$clusterName:$indexName"
else indexName
}

fun isRemoteClusterIndex(index: String, clusterService: ClusterService): Boolean {
val clusterName = parseClusterName(index)
return clusterName.isNotEmpty() && clusterService.clusterName.value() != clusterName
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1000,19 +1000,33 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return createTestAlias(alias = alias, indices = randomAliasIndices(alias, numOfAliasIndices, includeWriteIndex))
}

protected fun createTestAlias(
alias: String = randomAlphaOfLength(10).lowercase(Locale.ROOT),
numOfAliasIndices: Int = randomIntBetween(1, 10),
includeWriteIndex: Boolean = true,
indicesMapping: String,
): MutableMap<String, MutableMap<String, Boolean>> {
return createTestAlias(
alias = alias,
indices = randomAliasIndices(alias, numOfAliasIndices, includeWriteIndex),
indicesMapping = indicesMapping
)
}

protected fun createTestAlias(
alias: String = randomAlphaOfLength(10).lowercase(Locale.ROOT),
indices: Map<String, Boolean> = randomAliasIndices(
alias = alias,
num = randomIntBetween(1, 10),
includeWriteIndex = true
),
createIndices: Boolean = true
createIndices: Boolean = true,
indicesMapping: String = ""
): MutableMap<String, MutableMap<String, Boolean>> {
val indicesMap = mutableMapOf<String, Boolean>()
val indicesJson = jsonBuilder().startObject().startArray("actions")
indices.keys.map {
if (createIndices) createTestIndex(index = it, mapping = "")
if (createIndices) createTestIndex(index = it, indicesMapping)
val isWriteIndex = indices.getOrDefault(it, false)
indicesMap[it] = isWriteIndex
val indexMap = mapOf(
Expand Down Expand Up @@ -1211,6 +1225,41 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
}
}

protected fun insertSampleTimeSerializedDataCurrentTime(index: String, data: List<String>) {
data.forEachIndexed { i, value ->
val time = ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(time)
val testDoc = """
{
"test_strict_date_time": "$testTime",
"test_field": "$value",
"number": "$i"
}
""".trimIndent()
// Indexing documents with deterministic doc id to allow for easy selected deletion during testing
indexDoc(index, (i + 1).toString(), testDoc)
}
}

protected fun insertSampleTimeSerializedDataWithTime(
index: String,
data: List<String>,
time: ZonedDateTime? = ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS),
) {
data.forEachIndexed { i, value ->
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(time)
val testDoc = """
{
"test_strict_date_time": "$testTime",
"test_field": "$value",
"number": "$i"
}
""".trimIndent()
// Indexing documents with deterministic doc id to allow for easy selected deletion during testing
indexDoc(index, (i + 1).toString(), testDoc)
}
}

protected fun deleteDataWithDocIds(index: String, docIds: List<String>) {
docIds.forEach {
deleteDoc(index, it)
Expand Down
Loading
Loading