Skip to content

Commit

Permalink
Fixed flaky tests under BasicReplicationIT and PauseReplicationIT (#282)
Browse files Browse the repository at this point in the history
Signed-off-by: Sai Kumar <[email protected]>
  • Loading branch information
saikaranam-amazon authored Jan 11, 2022
1 parent 3504db4 commit 725ad4b
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 12 deletions.
20 changes: 10 additions & 10 deletions src/test/kotlin/org/opensearch/replication/BasicReplicationIT.kt
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ class BasicReplicationIT : MultiClusterRestTestCase() {
var response = leader.index(IndexRequest(leaderIndex).id("1").source(source), RequestOptions.DEFAULT)
assertThat(response.result).isEqualTo(Result.CREATED)

assertBusy {
assertBusy({
val getResponse = follower.get(GetRequest(followerIndex, "1"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isTrue()
assertThat(getResponse.sourceAsMap).isEqualTo(source)
}
}, 60L, TimeUnit.SECONDS)

// Ensure force merge on leader doesn't impact replication
for (i in 2..5) {
Expand All @@ -73,13 +73,13 @@ class BasicReplicationIT : MultiClusterRestTestCase() {
response = leader.index(IndexRequest(leaderIndex).id("$i").source(source), RequestOptions.DEFAULT)
assertThat(response.result).isEqualTo(Result.CREATED)
}
assertBusy {
assertBusy({
for (i in 2..10) {
val getResponse = follower.get(GetRequest(followerIndex, "$i"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isTrue()
assertThat(getResponse.sourceAsMap).isEqualTo(source)
}
}
}, 60L, TimeUnit.SECONDS)

// Force merge on follower however isn't allowed due to WRITE block
Assertions.assertThatThrownBy {
Expand Down Expand Up @@ -131,31 +131,31 @@ class BasicReplicationIT : MultiClusterRestTestCase() {
var response = leaderClient.index(IndexRequest(leaderIndexName).id("1").source(source), RequestOptions.DEFAULT)
assertThat(response.result).withFailMessage("Failed to create leader data").isEqualTo(Result.CREATED)

assertBusy {
assertBusy({
val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isTrue()
assertThat(getResponse.sourceAsMap).isEqualTo(source)
}
}, 60L, TimeUnit.SECONDS)

// Update document
source = mapOf("name" to randomAlphaOfLength(20), "age" to randomInt().toString())
response = leaderClient.index(IndexRequest(leaderIndexName).id("1").source(source), RequestOptions.DEFAULT)
assertThat(response.result).withFailMessage("Failed to update leader data").isEqualTo(Result.UPDATED)

assertBusy {
assertBusy({
val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isTrue()
assertThat(getResponse.sourceAsMap).isEqualTo(source)
}
},60L, TimeUnit.SECONDS)

// Delete document
val deleteResponse = leaderClient.delete(DeleteRequest(leaderIndexName).id("1"), RequestOptions.DEFAULT)
assertThat(deleteResponse.result).withFailMessage("Failed to delete leader data").isEqualTo(Result.DELETED)

assertBusy {
assertBusy({
val getResponse = followerClient.get(GetRequest(followerIndexName, "1"), RequestOptions.DEFAULT)
assertThat(getResponse.isExists).isFalse()
}
}, 60L, TimeUnit.SECONDS)
} finally {
followerClient.stopReplication(followerIndexName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.opensearch.replication.resumeReplication
import org.opensearch.replication.startReplication
import org.opensearch.replication.stopReplication
import org.opensearch.replication.updateReplication
import org.opensearch.replication.getShardReplicationTasks
import org.apache.http.util.EntityUtils
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
Expand All @@ -46,12 +47,11 @@ import java.util.concurrent.TimeUnit
)
class PauseReplicationIT: MultiClusterRestTestCase() {
private val leaderIndexName = "leader_index"
private val followerIndexName = "paused_index"

fun `test pause replication in following state and empty index`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)

val followerIndexName = "pause_index_follow_state"
createConnectionBetweenClusters(FOLLOWER, LEADER)

val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
Expand Down Expand Up @@ -108,6 +108,7 @@ class PauseReplicationIT: MultiClusterRestTestCase() {
| """.trimMargin())
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
val followerIndexName = "pause_index_restore_state"
createConnectionBetweenClusters(FOLLOWER, LEADER)

val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).settings(settings),
Expand All @@ -128,13 +129,20 @@ class PauseReplicationIT: MultiClusterRestTestCase() {
followerClient.pauseReplication(followerIndexName)
}.isInstanceOf(ResponseException::class.java)
.hasMessageContaining("Index is in restore phase currently for index: ${followerIndexName}")
// wait for the shard tasks to be up as the replication block is added before adding shard replication tasks
// During intermittent test failures, stop replication under finally block executes before this without removing
// replication block (even though next call to _stop replication API can succeed in removing this block).
assertBusy({
assertTrue(followerClient.getShardReplicationTasks(followerIndexName).isNotEmpty())
}, 30L, TimeUnit.SECONDS)
} finally {
followerClient.stopReplication(followerIndexName)
}
}

fun `test pause without replication in progress`() {
val followerClient = getClientForCluster(FOLLOWER)
val followerIndexName = "pause_index_no_repl"
//ToDo : Using followerIndex interferes with other test. Is wipeIndicesFromCluster not working ?
var randomIndex = "random"
val createIndexResponse = followerClient.indices().create(CreateIndexRequest(randomIndex),
Expand All @@ -153,6 +161,7 @@ class PauseReplicationIT: MultiClusterRestTestCase() {
fun `test pause replication and stop replication`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
val followerIndexName = "pause_index_with_stop"
createConnectionBetweenClusters(FOLLOWER, LEADER)

val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
Expand Down Expand Up @@ -182,6 +191,7 @@ class PauseReplicationIT: MultiClusterRestTestCase() {

fun `test pause replication when leader cluster is unavailable`() {
val followerClient = getClientForCluster(FOLLOWER)
val followerIndexName = "pause_index_leader_down"
try {
val leaderClient = getClientForCluster(LEADER)
createConnectionBetweenClusters(FOLLOWER, LEADER)
Expand Down

0 comments on commit 725ad4b

Please sign in to comment.