Skip to content

Commit

Permalink
clean up doc level queries on dry run (#1430)
Browse files Browse the repository at this point in the history
Signed-off-by: Joanne Wang <[email protected]>
  • Loading branch information
jowg-amazon authored and engechas committed Mar 18, 2024
1 parent 261e16d commit c00586a
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,9 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
)
} else {
// Clean up any queries created by the dry run monitor
monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueriesOnDryRun(monitorMetadata)
}

// TODO: Update the Document as part of the Trigger and return back the trigger action result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import org.opensearch.action.admin.indices.alias.Alias
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.admin.indices.create.CreateIndexResponse
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.admin.indices.rollover.RolloverRequest
import org.opensearch.action.admin.indices.rollover.RolloverResponse
Expand All @@ -38,8 +40,16 @@ import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.core.action.ActionListener
import org.opensearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING
import org.opensearch.index.query.QueryBuilders
import org.opensearch.index.reindex.BulkByScrollResponse
import org.opensearch.index.reindex.DeleteByQueryAction
import org.opensearch.index.reindex.DeleteByQueryRequestBuilder
import org.opensearch.rest.RestStatus
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

private val log = LogManager.getLogger(DocLevelMonitorQueries::class.java)

Expand Down Expand Up @@ -134,6 +144,42 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
return true
}

suspend fun deleteDocLevelQueriesOnDryRun(monitorMetadata: MonitorMetadata) {
try {
monitorMetadata.sourceToQueryIndexMapping.forEach { (_, queryIndex) ->
val indicesExistsResponse: IndicesExistsResponse =
client.suspendUntil {
client.admin().indices().exists(IndicesExistsRequest(queryIndex), it)
}
if (indicesExistsResponse.isExists == false) {
return
}

val queryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.existsQuery("monitor_id"))
.mustNot(QueryBuilders.wildcardQuery("monitor_id", "*"))

val response: BulkByScrollResponse = suspendCoroutine { cont ->
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
.source(queryIndex)
.filter(queryBuilder)
.refresh(true)
.execute(
object : ActionListener<BulkByScrollResponse> {
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
override fun onFailure(t: Exception) = cont.resumeWithException(t)
}
)
}
response.bulkFailures.forEach {
log.error("Failed deleting queries while removing dry run queries: [${it.id}] cause: [${it.cause}] ")
}
}
} catch (e: Exception) {
log.error("Failed to delete doc level queries on dry run", e)
}
}

fun docLevelQueryIndexExists(dataSources: DataSources): Boolean {
val clusterState = clusterService.state()
return clusterState.metadata.hasAlias(dataSources.queryIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,21 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {

val alerts = searchAlerts(monitor)
assertEquals("Alert saved for test monitor", 0, alerts.size)

// ensure doc level query is deleted on dry run
val request = """{
"size": 10,
"query": {
"match_all": {}
}
}"""
var httpResponse = adminClient().makeRequest(
"GET", "/${monitor.dataSources.queryIndex}/_search",
StringEntity(request, ContentType.APPLICATION_JSON)
)
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 0L, it.value) }
}

fun `test dryrun execute monitor with queryFieldNames set up with correct field`() {
Expand Down Expand Up @@ -297,6 +312,120 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
assertEquals("Incorrect search result", 2, matchingDocsToQuery.size)
assertTrue("Incorrect search result", matchingDocsToQuery.contains("1|$testIndex"))
assertTrue("Incorrect search result", matchingDocsToQuery.contains("5|$testIndex"))

// ensure doc level query is deleted on dry run
val request = """{
"size": 10,
"query": {
"match_all": {}
}
}"""
var httpResponse = adminClient().makeRequest(
"GET", "/${monitor.dataSources.queryIndex}/_search",
StringEntity(request, ContentType.APPLICATION_JSON)
)
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 0L, it.value) }
}

fun `test execute monitor returns search result with dryrun then without dryrun ensure dry run query not saved`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "2", testDoc)

val response = executeMonitor(monitor, params = DRYRUN_MONITOR)

val output = entityAsMap(response)

assertEquals(monitor.name, output["monitor_name"])
@Suppress("UNCHECKED_CAST")
val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
val matchingDocsToQuery = searchResult[docQuery.id] as List<String>
assertEquals("Incorrect search result", 2, matchingDocsToQuery.size)
assertTrue("Incorrect search result", matchingDocsToQuery.contains("1|$testIndex"))
assertTrue("Incorrect search result", matchingDocsToQuery.contains("2|$testIndex"))

// ensure doc level query is deleted on dry run
val request = """{
"size": 10,
"query": {
"match_all": {}
}
}"""
var httpResponse = adminClient().makeRequest(
"GET", "/${monitor.dataSources.queryIndex}/_search",
StringEntity(request, ContentType.APPLICATION_JSON)
)
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
searchResponse.hits.totalHits?.let { assertEquals(0L, it.value) }

// create and execute second monitor not as dryrun
val testIndex2 = createTestIndex("test1")
val testTime2 = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc2 = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime2",
"test_field" : "us-east-1"
}"""

val docQuery2 = DocLevelQuery(query = "test_field:\"us-east-1\"", name = "3", fields = listOf())
val docLevelInput2 = DocLevelMonitorInput("description", listOf(testIndex2), listOf(docQuery2))

val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor2 = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput2), triggers = listOf(trigger2)))
assertNotNull(monitor2.id)

indexDoc(testIndex2, "1", testDoc2)
indexDoc(testIndex2, "5", testDoc2)

val response2 = executeMonitor(monitor2.id)
val output2 = entityAsMap(response2)

assertEquals(monitor2.name, output2["monitor_name"])
@Suppress("UNCHECKED_CAST")
val searchResult2 = (output2.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
val matchingDocsToQuery2 = searchResult2[docQuery2.id] as List<String>
assertEquals("Incorrect search result", 2, matchingDocsToQuery2.size)
assertTrue("Incorrect search result", matchingDocsToQuery2.containsAll(listOf("1|$testIndex2", "5|$testIndex2")))

val alerts = searchAlertsWithFilter(monitor2)
assertEquals("Alert saved for test monitor", 2, alerts.size)

val findings = searchFindings(monitor2)
assertEquals("Findings saved for test monitor", 2, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5"))

// ensure query from second monitor was saved
val expectedQueries = listOf("test_field_test1_${monitor2.id}:\"us-east-1\"")
httpResponse = adminClient().makeRequest(
"GET", "/${monitor.dataSources.queryIndex}/_search",
StringEntity(request, ContentType.APPLICATION_JSON)
)
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
searchResponse.hits.forEach { hit ->
val query = ((hit.sourceAsMap["query"] as Map<String, Any>)["query_string"] as Map<String, Any>)["query"]
assertTrue(expectedQueries.contains(query))
}
searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 1L, it.value) }
}

fun `test execute monitor generates alerts and findings`() {
Expand Down

0 comments on commit c00586a

Please sign in to comment.