From 725ad4bbb0deaac53a5165f4119e4535da28fc97 Mon Sep 17 00:00:00 2001 From: Sai Date: Tue, 11 Jan 2022 11:49:44 +0530 Subject: [PATCH] Fixed flaky tests under BasicReplicationIT and PauseReplicationIT (#282) Signed-off-by: Sai Kumar --- .../replication/BasicReplicationIT.kt | 20 +++++++++---------- .../integ/rest/PauseReplicationIT.kt | 14 +++++++++++-- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/src/test/kotlin/org/opensearch/replication/BasicReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/BasicReplicationIT.kt index a3b9e7f0..7d426ace 100644 --- a/src/test/kotlin/org/opensearch/replication/BasicReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/BasicReplicationIT.kt @@ -57,11 +57,11 @@ class BasicReplicationIT : MultiClusterRestTestCase() { var response = leader.index(IndexRequest(leaderIndex).id("1").source(source), RequestOptions.DEFAULT) assertThat(response.result).isEqualTo(Result.CREATED) - assertBusy { + assertBusy({ val getResponse = follower.get(GetRequest(followerIndex, "1"), RequestOptions.DEFAULT) assertThat(getResponse.isExists).isTrue() assertThat(getResponse.sourceAsMap).isEqualTo(source) - } + }, 60L, TimeUnit.SECONDS) // Ensure force merge on leader doesn't impact replication for (i in 2..5) { @@ -73,13 +73,13 @@ class BasicReplicationIT : MultiClusterRestTestCase() { response = leader.index(IndexRequest(leaderIndex).id("$i").source(source), RequestOptions.DEFAULT) assertThat(response.result).isEqualTo(Result.CREATED) } - assertBusy { + assertBusy({ for (i in 2..10) { val getResponse = follower.get(GetRequest(followerIndex, "$i"), RequestOptions.DEFAULT) assertThat(getResponse.isExists).isTrue() assertThat(getResponse.sourceAsMap).isEqualTo(source) } - } + }, 60L, TimeUnit.SECONDS) // Force merge on follower however isn't allowed due to WRITE block Assertions.assertThatThrownBy { @@ -131,31 +131,31 @@ class BasicReplicationIT : MultiClusterRestTestCase() { var response = leaderClient.index(IndexRequest(leaderIndexName).id("1").source(source), RequestOptions.DEFAULT) assertThat(response.result).withFailMessage("Failed to create leader data").isEqualTo(Result.CREATED) - assertBusy { + assertBusy({ val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT) assertThat(getResponse.isExists).isTrue() assertThat(getResponse.sourceAsMap).isEqualTo(source) - } + }, 60L, TimeUnit.SECONDS) // Update document source = mapOf("name" to randomAlphaOfLength(20), "age" to randomInt().toString()) response = leaderClient.index(IndexRequest(leaderIndexName).id("1").source(source), RequestOptions.DEFAULT) assertThat(response.result).withFailMessage("Failed to update leader data").isEqualTo(Result.UPDATED) - assertBusy { + assertBusy({ val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT) assertThat(getResponse.isExists).isTrue() assertThat(getResponse.sourceAsMap).isEqualTo(source) - } + },60L, TimeUnit.SECONDS) // Delete document val deleteResponse = leaderClient.delete(DeleteRequest(leaderIndexName).id("1"), RequestOptions.DEFAULT) assertThat(deleteResponse.result).withFailMessage("Failed to delete leader data").isEqualTo(Result.DELETED) - assertBusy { + assertBusy({ val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT) assertThat(getResponse.isExists).isFalse() - } + }, 60L, TimeUnit.SECONDS) } finally { followerClient.stopReplication(followerIndexName) } diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/PauseReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/PauseReplicationIT.kt index 568e08f1..6d013746 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/PauseReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/PauseReplicationIT.kt @@ -23,6 +23,7 @@ import org.opensearch.replication.resumeReplication import org.opensearch.replication.startReplication import org.opensearch.replication.stopReplication import org.opensearch.replication.updateReplication +import org.opensearch.replication.getShardReplicationTasks import org.apache.http.util.EntityUtils import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy @@ -46,12 +47,11 @@ import java.util.concurrent.TimeUnit ) class PauseReplicationIT: MultiClusterRestTestCase() { private val leaderIndexName = "leader_index" - private val followerIndexName = "paused_index" fun `test pause replication in following state and empty index`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) - + val followerIndexName = "pause_index_follow_state" createConnectionBetweenClusters(FOLLOWER, LEADER) val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) @@ -108,6 +108,7 @@ class PauseReplicationIT: MultiClusterRestTestCase() { | """.trimMargin()) val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) + val followerIndexName = "pause_index_restore_state" createConnectionBetweenClusters(FOLLOWER, LEADER) val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings), @@ -128,6 +129,12 @@ class PauseReplicationIT: MultiClusterRestTestCase() { followerClient.pauseReplication(followerIndexName) }.isInstanceOf(ResponseException::class.java) .hasMessageContaining("Index is in restore phase currently for index: ${followerIndexName}") + // wait for the shard tasks to be up as the replication block is added before adding shard replication tasks + // During intermittent test failures, stop replication under finally block executes before this without removing + // replication block (even though next call to _stop replication API can succeed in removing this block). + assertBusy({ + assertTrue(followerClient.getShardReplicationTasks(followerIndexName).isNotEmpty()) + }, 30L, TimeUnit.SECONDS) } finally { followerClient.stopReplication(followerIndexName) } @@ -135,6 +142,7 @@ class PauseReplicationIT: MultiClusterRestTestCase() { fun `test pause without replication in progress`() { val followerClient = getClientForCluster(FOLLOWER) + val followerIndexName = "pause_index_no_repl" //ToDo : Using followerIndex interferes with other test. Is wipeIndicesFromCluster not working ? var randomIndex = "random" val createIndexResponse = followerClient.indices().create(CreateIndexRequest(randomIndex), @@ -153,6 +161,7 @@ class PauseReplicationIT: MultiClusterRestTestCase() { fun `test pause replication and stop replication`() { val followerClient = getClientForCluster(FOLLOWER) val leaderClient = getClientForCluster(LEADER) + val followerIndexName = "pause_index_with_stop" createConnectionBetweenClusters(FOLLOWER, LEADER) val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) @@ -182,6 +191,7 @@ class PauseReplicationIT: MultiClusterRestTestCase() { fun `test pause replication when leader cluster is unavailable`() { val followerClient = getClientForCluster(FOLLOWER) + val followerIndexName = "pause_index_leader_down" try { val leaderClient = getClientForCluster(LEADER) createConnectionBetweenClusters(FOLLOWER, LEADER)