Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[test pr ] Test flaky #14

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added experimental support for extensions ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)), ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518), ([#5597](https://github.com/opensearch-project/OpenSearch/pull/5597)), ([#5615](https://github.com/opensearch-project/OpenSearch/pull/5615)))
- Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348))
- Add support for ppc64le architecture ([#5459](https://github.com/opensearch-project/OpenSearch/pull/5459))
- Support versioning for Weighted routing apis([#5255](https://github.com/opensearch-project/OpenSearch/pull/5255))
- Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668))
- Added support for feature flags in opensearch.yml ([#4959](https://github.com/opensearch-project/OpenSearch/pull/4959))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public void testRestStatusForAcknowledgedDecommission() throws IOException {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@

package org.opensearch.cluster.routing;

import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
Expand Down Expand Up @@ -64,6 +66,7 @@ public void testPutWeightedRouting() {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);

Expand All @@ -73,6 +76,7 @@ public void testPutWeightedRouting() {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(0)
.get();
assertEquals(response.isAcknowledged(), true);
}
Expand Down Expand Up @@ -199,6 +203,7 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);

Expand Down Expand Up @@ -270,6 +275,7 @@ public void testWeightedRoutingMetadataOnOSProcessRestart() throws Exception {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);

Expand Down Expand Up @@ -307,12 +313,14 @@ public void testDeleteWeightedRouting_WeightsNotSet() {
assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// delete weighted routing metadata
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get();
assertTrue(deleteResponse.isAcknowledged());
assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());
ResourceNotFoundException exception = expectThrows(
ResourceNotFoundException.class,
() -> client().admin().cluster().prepareDeleteWeightedRouting().setVersion(-1).get()
);
assertEquals(RestStatus.NOT_FOUND, exception.status());
}

public void testDeleteWeightedRouting_WeightsAreSet() {
public void testDeleteWeightedRouting_WeightsAreSet() throws IOException {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
Expand All @@ -339,13 +347,111 @@ public void testDeleteWeightedRouting_WeightsAreSet() {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);
assertTrue(response.isAcknowledged());
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// delete weighted routing metadata
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get();
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(0).get();
assertTrue(deleteResponse.isAcknowledged());
assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());
}

public void testPutAndDeleteWithVersioning() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

logger.info("--> starting 6 nodes on different zones");
int nodeCountPerAZ = 2;

logger.info("--> starting a dedicated cluster manager node");
internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build());

logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'");
internalCluster().startDataOnlyNodes(nodeCountPerAZ, Settings.builder().put(commonSettings).put("node.attr.zone", "a").build());
internalCluster().startDataOnlyNodes(nodeCountPerAZ, Settings.builder().put(commonSettings).put("node.attr.zone", "b").build());
internalCluster().startDataOnlyNodes(nodeCountPerAZ, Settings.builder().put(commonSettings).put("node.attr.zone", "c").build());

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("7").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");

Map<String, Double> weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(response.isAcknowledged());
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// update weights api call with correct version number
weights = Map.of("a", 1.0, "b", 2.0, "c", 4.0);
weightedRouting = new WeightedRouting("zone", weights);
response = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(0).get();
assertTrue(response.isAcknowledged());

// update weights api call with incorrect version number
weights = Map.of("a", 1.0, "b", 2.0, "c", 4.0);
WeightedRouting weightedRouting1 = new WeightedRouting("zone", weights);
UnsupportedWeightedRoutingStateException exception = expectThrows(
UnsupportedWeightedRoutingStateException.class,
() -> client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting1).setVersion(100).get()
);
assertEquals(exception.status(), RestStatus.CONFLICT);

// get weights call
ClusterGetWeightedRoutingResponse weightedRoutingResponse = client().admin()
.cluster()
.prepareGetWeightedRouting()
.setAwarenessAttribute("zone")
.get();

// update weights call using version returned by get api call
weights = Map.of("a", 1.0, "b", 2.0, "c", 5.0);
weightedRouting = new WeightedRouting("zone", weights);
response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(weightedRoutingResponse.getVersion())
.get();
assertTrue(response.isAcknowledged());
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// delete weights by awareness attribute
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin()
.cluster()
.prepareDeleteWeightedRouting()
.setAwarenessAttribute("zone")
.setVersion(2)
.get();
assertTrue(deleteResponse.isAcknowledged());

// update weights again and make sure that version number got updated on delete
weights = Map.of("a", 1.0, "b", 2.0, "c", 6.0);
weightedRouting = new WeightedRouting("zone", weights);
response = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(3).get();
assertTrue(response.isAcknowledged());
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// delete weights
deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(4).get();
assertTrue(deleteResponse.isAcknowledged());

// delete weights call, incorrect version number
UnsupportedWeightedRoutingStateException deleteException = expectThrows(
UnsupportedWeightedRoutingStateException.class,
() -> client().admin().cluster().prepareDeleteWeightedRouting().setVersion(7).get()
);
assertEquals(RestStatus.CONFLICT, deleteException.status());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,142 @@

package org.opensearch.action.admin.cluster.shards.routing.weighted.delete;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchGenerationException;
import org.opensearch.OpenSearchParseException;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.DeprecationHandler;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;

import java.io.IOException;
import java.util.Map;

/**
* Request to delete weights for weighted round-robin shard routing policy.
*
* @opensearch.internal
*/
public class ClusterDeleteWeightedRoutingRequest extends ClusterManagerNodeRequest<ClusterDeleteWeightedRoutingRequest> {
public ClusterDeleteWeightedRoutingRequest() {}
private static final Logger logger = LogManager.getLogger(ClusterDeleteWeightedRoutingRequest.class);

private long version;
private String awarenessAttribute;

public void setVersion(long version) {
this.version = version;
}

public ClusterDeleteWeightedRoutingRequest() {
this.version = WeightedRoutingMetadata.VERSION_UNSET_VALUE;
}

public ClusterDeleteWeightedRoutingRequest(StreamInput in) throws IOException {
super(in);
version = in.readLong();
if (in.available() != 0) {
awarenessAttribute = in.readString();
}
}

public long getVersion() {
return version;
}

public String getAwarenessAttribute() {
return awarenessAttribute;
}

public void setAwarenessAttribute(String awarenessAttribute) {
this.awarenessAttribute = awarenessAttribute;
}

public ClusterDeleteWeightedRoutingRequest(String awarenessAttribute) {
this.awarenessAttribute = awarenessAttribute;
this.version = WeightedRoutingMetadata.VERSION_UNSET_VALUE;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

/**
* @param source weights definition from request body
* @return this request
*/
public ClusterDeleteWeightedRoutingRequest source(Map<String, String> source) {
try {
if (source.isEmpty()) {
throw new OpenSearchParseException(("Empty request body"));
}
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.map(source);
setRequestBody(BytesReference.bytes(builder), builder.contentType());
} catch (IOException e) {
throw new OpenSearchGenerationException("Failed to generate [" + source + "]", e);
}
return this;
}

public void setRequestBody(BytesReference source, XContentType contentType) {
try (
XContentParser parser = XContentHelper.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
source,
contentType
)
) {
String versionAttr = null;
XContentParser.Token token;
// move to the first alias
parser.nextToken();
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
String fieldName = parser.currentName();
if (fieldName != null && fieldName.equals(WeightedRoutingMetadata.VERSION)) {
versionAttr = parser.currentName();
} else {
throw new OpenSearchParseException(
"failed to parse delete weighted routing request body [{}], unknown type",
fieldName
);
}
} else if (token == XContentParser.Token.VALUE_STRING) {
if (versionAttr != null && versionAttr.equals(WeightedRoutingMetadata.VERSION)) {
this.version = Long.parseLong(parser.text());
}
} else {
throw new OpenSearchParseException("failed to parse delete weighted routing request body");
}
}
} catch (IOException e) {
logger.error("error while parsing delete request for weighted routing request object", e);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(version);
if (awarenessAttribute != null) {
out.writeString(awarenessAttribute);
}
}

@Override
public String toString() {
return "ClusterDeleteWeightedRoutingRequest";
return "ClusterDeleteWeightedRoutingRequest{" + "version= " + version + "awarenessAttribute=" + awarenessAttribute + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,15 @@ public class ClusterDeleteWeightedRoutingRequestBuilder extends ClusterManagerNo
public ClusterDeleteWeightedRoutingRequestBuilder(OpenSearchClient client, ClusterDeleteWeightedRoutingAction action) {
super(client, action, new ClusterDeleteWeightedRoutingRequest());
}

public ClusterDeleteWeightedRoutingRequestBuilder setVersion(long version) {
request.setVersion(version);
return this;
}

public ClusterDeleteWeightedRoutingRequestBuilder setAwarenessAttribute(String attribute) {
request.setAwarenessAttribute(attribute);
return this;
}

}
Loading