Skip to content

Commit

Permalink
optimize doc-level monitor execution workflow for datastreams #1302
Browse files Browse the repository at this point in the history
Signed-off-by: Megha Goyal <[email protected]>
  • Loading branch information
goyamegh committed Mar 15, 2024
1 parent 40a1c4b commit 26fb04a
Show file tree
Hide file tree
Showing 5 changed files with 515 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

try {
// Resolve all passed indices to concrete indices
val concreteIndices = IndexUtils.resolveAllIndices(
val allConcreteIndices = IndexUtils.resolveAllIndices(
docLevelMonitorInput.indices,
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)
if (concreteIndices.isEmpty()) {
if (allConcreteIndices.isEmpty()) {
logger.error("indices not found-${docLevelMonitorInput.indices.joinToString(",")}")
throw IndexNotFoundException(docLevelMonitorInput.indices.joinToString(","))
}
Expand All @@ -135,17 +135,32 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

// cleanup old indices that are not monitored anymore from the same monitor
for (ind in updatedLastRunContext.keys) {
if (!concreteIndices.contains(ind)) {
if (!allConcreteIndices.contains(ind)) {
updatedLastRunContext.remove(ind)
}
}

docLevelMonitorInput.indices.forEach { indexName ->
val concreteIndices = IndexUtils.resolveAllIndices(
var concreteIndices = IndexUtils.resolveAllIndices(
listOf(indexName),
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)
var lastWriteIndex: String? = null
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
) {
lastWriteIndex = concreteIndices.find { lastRunContext.containsKey(it) }
if (lastWriteIndex != null) {
val lastWriteIndexCreationDate =
IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state())
concreteIndices = IndexUtils.getNewestIndicesByCreationDate(
concreteIndices,
monitorCtx.clusterService!!.state(),
lastWriteIndexCreationDate
)
}
}
val updatedIndexName = indexName.replace("*", "_")
val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields(
monitorCtx.clusterService!!.state(),
Expand All @@ -170,7 +185,17 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx,
concreteIndexName
) as MutableMap<String, Any>
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext

if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
) {
if (concreteIndexName == IndexUtils.getWriteIndex(indexName, monitorCtx.clusterService!!.state())) {
updatedLastRunContext.remove(lastWriteIndex)
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
}
} else {
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
}

val count: Int = indexLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,18 @@ object MonitorMetadataService :
val lastRunContext = existingRunContext?.toMutableMap() ?: mutableMapOf()
try {
if (index == null) return mutableMapOf()
val getIndexRequest = GetIndexRequest().indices(index)
val getIndexResponse: GetIndexResponse = client.suspendUntil {
client.admin().indices().getIndex(getIndexRequest, it)
val indices = mutableListOf<String>()
if (IndexUtils.isAlias(index, clusterService.state()) ||
IndexUtils.isDataStream(index, clusterService.state())
) {
IndexUtils.getWriteIndex(index, clusterService.state())?.let { indices.add(it) }
} else {
val getIndexRequest = GetIndexRequest().indices(index)
val getIndexResponse: GetIndexResponse = client.suspendUntil {
client.admin().indices().getIndex(getIndexRequest, it)
}
indices.addAll(getIndexResponse.indices())
}
val indices = getIndexResponse.indices()

indices.forEach { indexName ->
if (!lastRunContext.containsKey(indexName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,27 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ

// Run through each backing index and apply appropriate mappings to query index
indices.forEach { indexName ->
val concreteIndices = IndexUtils.resolveAllIndices(
var concreteIndices = IndexUtils.resolveAllIndices(
listOf(indexName),
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)

if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
) {
val lastWriteIndex = concreteIndices.find { monitorMetadata.lastRunContext.containsKey(it) }
if (lastWriteIndex != null) {
val lastWriteIndexCreationDate =
IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state())
concreteIndices = IndexUtils.getNewestIndicesByCreationDate(
concreteIndices,
monitorCtx.clusterService!!.state(),
lastWriteIndexCreationDate
)
}
}

val updatedIndexName = indexName.replace("*", "_")
val updatedProperties = mutableMapOf<String, Any>()
val allFlattenPaths = mutableSetOf<Pair<String, String>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,14 +774,19 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
private fun indexDoc(client: RestClient, index: String, id: String, doc: String, refresh: Boolean = true): Response {
val requestBody = StringEntity(doc, APPLICATION_JSON)
val params = if (refresh) mapOf("refresh" to "true") else mapOf()
val response = client.makeRequest("PUT", "$index/_doc/$id", params, requestBody)
val response = client.makeRequest("POST", "$index/_doc/$id?op_type=create", params, requestBody)
assertTrue(
"Unable to index doc: '${doc.take(15)}...' to index: '$index'",
listOf(RestStatus.OK, RestStatus.CREATED).contains(response.restStatus())
)
return response
}

protected fun createTestIndex(index: String, mapping: String?, alias: String): String {
createIndex(index, Settings.EMPTY, mapping?.trimIndent(), alias)
return index
}

protected fun deleteDoc(index: String, id: String, refresh: Boolean = true): Response {
val params = if (refresh) mapOf("refresh" to "true") else mapOf()
val response = client().makeRequest("DELETE", "$index/_doc/$id", params)
Expand Down Expand Up @@ -846,7 +851,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
val indicesMap = mutableMapOf<String, Boolean>()
val indicesJson = jsonBuilder().startObject().startArray("actions")
indices.keys.map {
val indexName = createTestIndex(index = it.lowercase(Locale.ROOT), mapping = "")
val indexName = createTestIndex(index = it, mapping = "")
val isWriteIndex = indices.getOrDefault(indexName, false)
indicesMap[indexName] = isWriteIndex
val indexMap = mapOf(
Expand All @@ -863,17 +868,155 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return mutableMapOf(alias to indicesMap)
}

protected fun createDataStream(datastream: String, mappings: String?, useComponentTemplate: Boolean) {
val indexPattern = "$datastream*"
var componentTemplateMappings = "\"properties\": {" +
" \"netflow.destination_transport_port\":{ \"type\": \"long\" }," +
" \"netflow.destination_ipv4_address\":{ \"type\": \"ip\" }" +
"}"
if (mappings != null) {
componentTemplateMappings = mappings
}
if (useComponentTemplate) {
// Setup index_template
createComponentTemplateWithMappings(
"my_ds_component_template-$datastream",
componentTemplateMappings
)
}
createComposableIndexTemplate(
"my_index_template_ds-$datastream",
listOf(indexPattern),
(if (useComponentTemplate) "my_ds_component_template-$datastream" else null),
mappings,
true,
0
)
createDataStream(datastream)
}

protected fun createDataStream(datastream: String? = randomAlphaOfLength(10).lowercase(Locale.ROOT)) {
client().makeRequest("PUT", "_data_stream/$datastream")
}

protected fun deleteDataStream(datastream: String) {
client().makeRequest("DELETE", "_data_stream/$datastream")
}

protected fun createIndexAlias(alias: String, mappings: String?) {
val indexPattern = "$alias*"
var componentTemplateMappings = "\"properties\": {" +
" \"netflow.destination_transport_port\":{ \"type\": \"long\" }," +
" \"netflow.destination_ipv4_address\":{ \"type\": \"ip\" }" +
"}"
if (mappings != null) {
componentTemplateMappings = mappings
}
createComponentTemplateWithMappings(
"my_alias_component_template-$alias",
componentTemplateMappings
)
createComposableIndexTemplate(
"my_index_template_alias-$alias",
listOf(indexPattern),
"my_alias_component_template-$alias",
mappings,
false,
0
)
createTestIndex(
"$alias-000001",
null,
"""
"$alias": {
"is_write_index": true
}
""".trimIndent()
)
}

protected fun deleteIndexAlias(alias: String) {
client().makeRequest("DELETE", "$alias*/_alias/$alias")
}

protected fun createComponentTemplateWithMappings(componentTemplateName: String, mappings: String?) {
val body = """{"template" : { "mappings": {$mappings} }}"""
client().makeRequest(
"PUT",
"_component_template/$componentTemplateName",
emptyMap(),
StringEntity(body, ContentType.APPLICATION_JSON),
BasicHeader("Content-Type", "application/json")
)
}

protected fun createComposableIndexTemplate(
templateName: String,
indexPatterns: List<String>,
componentTemplateName: String?,
mappings: String?,
isDataStream: Boolean,
priority: Int
) {
var body = "{\n"
if (isDataStream) {
body += "\"data_stream\": { },"
}
body += "\"index_patterns\": [" +
indexPatterns.stream().collect(
Collectors.joining(",", "\"", "\"")
) + "],"
if (componentTemplateName == null) {
body += "\"template\": {\"mappings\": {$mappings}},"
}
if (componentTemplateName != null) {
body += "\"composed_of\": [\"$componentTemplateName\"],"
}
body += "\"priority\":$priority}"
client().makeRequest(
"PUT",
"_index_template/$templateName",
emptyMap(),
StringEntity(body, APPLICATION_JSON),
BasicHeader("Content-Type", "application/json")
)
}

protected fun getDatastreamWriteIndex(datastream: String): String {
val response = client().makeRequest("GET", "_data_stream/$datastream", emptyMap(), null)
var respAsMap = responseAsMap(response)
if (respAsMap.containsKey("data_streams")) {
respAsMap = (respAsMap["data_streams"] as ArrayList<HashMap<String, *>>)[0]
val indices = respAsMap["indices"] as List<Map<String, Any>>
val index = indices.last()
return index["index_name"] as String
} else {
respAsMap = respAsMap[datastream] as Map<String, Object>
}
val indices = respAsMap["indices"] as Array<String>
return indices.last()
}

protected fun rolloverDatastream(datastream: String) {
client().makeRequest(
"POST",
datastream + "/_rollover",
emptyMap(),
null
)
}

protected fun randomAliasIndices(
alias: String,
num: Int = randomIntBetween(1, 10),
includeWriteIndex: Boolean = true
includeWriteIndex: Boolean = true,
): Map<String, Boolean> {
val indices = mutableMapOf<String, Boolean>()
val writeIndex = randomIntBetween(0, num)
val writeIndex = randomIntBetween(0, num - 1)
for (i: Int in 0 until num) {
var indexName = randomAlphaOfLength(10)
var indexName = randomAlphaOfLength(10).lowercase(Locale.ROOT)
while (indexName.equals(alias) || indices.containsKey(indexName))
indexName = randomAlphaOfLength(10)
indexName = randomAlphaOfLength(10).lowercase(Locale.ROOT)
indices[indexName] = includeWriteIndex && i == writeIndex
}
return indices
Expand Down
Loading

0 comments on commit 26fb04a

Please sign in to comment.