Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add safeguards limits for file cache #8989

Merged
merged 2 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add server version as REST response header [#6583](https://github.com/opensearch-project/OpenSearch/issues/6583)
- Start replication checkpointTimers on primary before segments upload to remote store. ([#8221]()https://github.com/opensearch-project/OpenSearch/pull/8221)
- Introduce new static cluster setting to control slice computation for concurrent segment search. ([#8847](https://github.com/opensearch-project/OpenSearch/pull/8884))
- Add configuration for file cache size to max remote data ratio to prevent oversubscription of file cache ([#8606](https://github.com/opensearch-project/OpenSearch/pull/8606))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand All @@ -33,6 +34,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Create separate SourceLookup instance per segment slice in SignificantTextAggregatorFactory ([#8807](https://github.com/opensearch-project/OpenSearch/pull/8807))
- Replace the deprecated IndexReader APIs with new storedFields() & termVectors() ([#7792](https://github.com/opensearch-project/OpenSearch/pull/7792))
- [Remote Store] Add support to restore only unassigned shards of an index ([#8792](https://github.com/opensearch-project/OpenSearch/pull/8792))
- Add safeguard limits for file cache during node level allocation ([#8208](https://github.com/opensearch-project/OpenSearch/pull/8208))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.core.common.Strings;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.index.store.Store;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndexDescriptor;
Expand Down Expand Up @@ -192,6 +193,11 @@ public void testClusterInfoServiceCollectsInformation() {
logger.info("--> shard size: {}", size);
assertThat("shard size is greater than 0", size, greaterThanOrEqualTo(0L));
}

final Map<String, FileCacheStats> nodeFileCacheStats = info.nodeFileCacheStats;
assertNotNull(nodeFileCacheStats);
assertThat("file cache is empty on non search nodes", nodeFileCacheStats.size(), Matchers.equalTo(0));

ClusterService clusterService = internalTestCluster.getInstance(ClusterService.class, internalTestCluster.getClusterManagerName());
ClusterState state = clusterService.state();
for (ShardRouting shard : state.routingTable().allShards()) {
Expand All @@ -209,6 +215,28 @@ public void testClusterInfoServiceCollectsInformation() {
}
}

public void testClusterInfoServiceCollectsFileCacheInformation() {
internalCluster().startNodes(1);
internalCluster().ensureAtLeastNumSearchAndDataNodes(2);

InternalTestCluster internalTestCluster = internalCluster();
// Get the cluster info service on the cluster-manager node
final InternalClusterInfoService infoService = (InternalClusterInfoService) internalTestCluster.getInstance(
ClusterInfoService.class,
internalTestCluster.getClusterManagerName()
);
infoService.setUpdateFrequency(TimeValue.timeValueMillis(200));
ClusterInfo info = infoService.refresh();
assertNotNull("info should not be null", info);
final Map<String, FileCacheStats> nodeFileCacheStats = info.nodeFileCacheStats;
assertNotNull(nodeFileCacheStats);
assertThat("file cache is enabled on both search nodes", nodeFileCacheStats.size(), Matchers.equalTo(2));

for (FileCacheStats fileCacheStats : nodeFileCacheStats.values()) {
assertThat("file cache is non empty", fileCacheStats.getTotal().getBytes(), greaterThan(0L));
}
}

public void testClusterInfoServiceInformationClearOnError() {
internalCluster().startNodes(
2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ public String snapshotUuid() {
/**
* Sets the storage type for this request.
*/
RestoreSnapshotRequest storageType(StorageType storageType) {
public RestoreSnapshotRequest storageType(StorageType storageType) {
this.storageType = storageType;
return this;
}
Expand Down
24 changes: 22 additions & 2 deletions server/src/main/java/org/opensearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.cluster;

import org.opensearch.Version;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.core.common.io.stream.StreamInput;
Expand All @@ -41,6 +42,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.store.StoreStats;
import org.opensearch.index.store.remote.filecache.FileCacheStats;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -64,9 +66,10 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
public static final ClusterInfo EMPTY = new ClusterInfo();
final Map<ShardRouting, String> routingToDataPath;
final Map<NodeAndPath, ReservedSpace> reservedSpace;
final Map<String, FileCacheStats> nodeFileCacheStats;

protected ClusterInfo() {
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
}

/**
Expand All @@ -84,13 +87,15 @@ public ClusterInfo(
final Map<String, DiskUsage> mostAvailableSpaceUsage,
final Map<String, Long> shardSizes,
final Map<ShardRouting, String> routingToDataPath,
final Map<NodeAndPath, ReservedSpace> reservedSpace
final Map<NodeAndPath, ReservedSpace> reservedSpace,
final Map<String, FileCacheStats> nodeFileCacheStats
) {
this.leastAvailableSpaceUsage = leastAvailableSpaceUsage;
this.shardSizes = shardSizes;
this.mostAvailableSpaceUsage = mostAvailableSpaceUsage;
this.routingToDataPath = routingToDataPath;
this.reservedSpace = reservedSpace;
this.nodeFileCacheStats = nodeFileCacheStats;
}

public ClusterInfo(StreamInput in) throws IOException {
Expand All @@ -110,6 +115,11 @@ public ClusterInfo(StreamInput in) throws IOException {
this.shardSizes = Collections.unmodifiableMap(sizeMap);
this.routingToDataPath = Collections.unmodifiableMap(routingMap);
this.reservedSpace = Collections.unmodifiableMap(reservedSpaceMap);
if (in.getVersion().onOrAfter(Version.V_2_10_0)) {
this.nodeFileCacheStats = in.readMap(StreamInput::readString, FileCacheStats::new);
} else {
this.nodeFileCacheStats = Map.of();
}
}

@Override
Expand All @@ -121,6 +131,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) {
out.writeMap(this.reservedSpace, (o, v) -> v.writeTo(o), (o, v) -> v.writeTo(o));
}
if (out.getVersion().onOrAfter(Version.V_2_10_0)) {
out.writeMap(this.nodeFileCacheStats, StreamOutput::writeString, (o, v) -> v.writeTo(o));
}
}

public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
Expand Down Expand Up @@ -194,6 +207,13 @@ public Map<String, DiskUsage> getNodeMostAvailableDiskUsages() {
return Collections.unmodifiableMap(this.mostAvailableSpaceUsage);
}

/**
* Returns a node id to file cache stats mapping for the nodes that have search roles assigned to it.
*/
public Map<String, FileCacheStats> getNodeFileCacheStats() {
return Collections.unmodifiableMap(this.nodeFileCacheStats);
}

/**
* Returns the shard size for the given shard routing or <code>null</code> it that metric is not available.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.index.store.StoreStats;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ReceiveTimeoutTransportException;
Expand All @@ -72,6 +73,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
* InternalClusterInfoService provides the ClusterInfoService interface,
Expand Down Expand Up @@ -110,6 +112,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt

private volatile Map<String, DiskUsage> leastAvailableSpaceUsages;
private volatile Map<String, DiskUsage> mostAvailableSpaceUsages;
private volatile Map<String, FileCacheStats> nodeFileCacheStats;
private volatile IndicesStatsSummary indicesStatsSummary;
// null if this node is not currently the cluster-manager
private final AtomicReference<RefreshAndRescheduleRunnable> refreshAndRescheduleRunnable = new AtomicReference<>();
Expand All @@ -122,6 +125,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Client client) {
this.leastAvailableSpaceUsages = Map.of();
this.mostAvailableSpaceUsages = Map.of();
this.nodeFileCacheStats = Map.of();
this.indicesStatsSummary = IndicesStatsSummary.EMPTY;
this.threadPool = threadPool;
this.client = client;
Expand Down Expand Up @@ -208,7 +212,8 @@ public ClusterInfo getClusterInfo() {
mostAvailableSpaceUsages,
indicesStatsSummary.shardSizes,
indicesStatsSummary.shardRoutingToDataPath,
indicesStatsSummary.reservedSpace
indicesStatsSummary.reservedSpace,
nodeFileCacheStats
);
}

Expand All @@ -221,6 +226,7 @@ protected CountDownLatch updateNodeStats(final ActionListener<NodesStatsResponse
final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest("data:true");
nodesStatsRequest.clear();
nodesStatsRequest.addMetric(NodesStatsRequest.Metric.FS.metricName());
nodesStatsRequest.addMetric(NodesStatsRequest.Metric.FILE_CACHE_STATS.metricName());
nodesStatsRequest.timeout(fetchTimeout);
client.admin().cluster().nodesStats(nodesStatsRequest, new LatchedActionListener<>(listener, latch));
return latch;
Expand Down Expand Up @@ -264,6 +270,13 @@ public void onResponse(NodesStatsResponse nodesStatsResponse) {
);
leastAvailableSpaceUsages = Collections.unmodifiableMap(leastAvailableUsagesBuilder);
mostAvailableSpaceUsages = Collections.unmodifiableMap(mostAvailableUsagesBuilder);

nodeFileCacheStats = Collections.unmodifiableMap(
nodesStatsResponse.getNodes()
.stream()
.filter(nodeStats -> nodeStats.getNode().isSearchNode())
.collect(Collectors.toMap(nodeStats -> nodeStats.getNode().getId(), NodeStats::getFileCacheStats))
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,16 @@ public ShardsIterator allShardsIncludingRelocationTargets(String[] indices) {
return allShardsSatisfyingPredicate(indices, shardRouting -> true, true);
}

/**
* All the shards on the node which match the predicate
* @param predicate condition to match
* @return iterator over shards matching the predicate
*/
public ShardsIterator allShardsSatisfyingPredicate(Predicate<ShardRouting> predicate) {
String[] indices = indicesRouting.keySet().toArray(new String[0]);
return allShardsSatisfyingPredicate(indices, predicate, false);
}

private ShardsIterator allShardsSatisfyingPredicate(
String[] indices,
Predicate<ShardRouting> predicate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,21 @@
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.snapshots.SnapshotShardSizeInfo;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.opensearch.cluster.routing.RoutingPool.REMOTE_CAPABLE;
import static org.opensearch.cluster.routing.RoutingPool.getNodePool;
import static org.opensearch.cluster.routing.RoutingPool.getShardPool;
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING;
import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING;

/**
* The {@link DiskThresholdDecider} checks that the node a shard is potentially
Expand Down Expand Up @@ -167,6 +174,42 @@ public static long sizeOfRelocatingShards(
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
ClusterInfo clusterInfo = allocation.clusterInfo();

/*
The following block enables allocation for remote shards within safeguard limits of the filecache.
*/
if (REMOTE_CAPABLE.equals(getNodePool(node)) && REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) {
final List<ShardRouting> remoteShardsOnNode = StreamSupport.stream(node.spliterator(), false)
.filter(shard -> shard.primary() && REMOTE_CAPABLE.equals(getShardPool(shard, allocation)))
.collect(Collectors.toList());
final long currentNodeRemoteShardSize = remoteShardsOnNode.stream()
.map(ShardRouting::getExpectedShardSize)
.mapToLong(Long::longValue)
.sum();

final long shardSize = getExpectedShardSize(
shardRouting,
0L,
allocation.clusterInfo(),
allocation.snapshotShardSizeInfo(),
allocation.metadata(),
allocation.routingTable()
);

final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0;
final long totalNodeRemoteShardSize = currentNodeRemoteShardSize + shardSize;
final double dataToFileCacheSizeRatio = DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.get(allocation.metadata().settings());
if (dataToFileCacheSizeRatio > 0.0f && totalNodeRemoteShardSize > dataToFileCacheSizeRatio * nodeCacheSize) {
return allocation.decision(
Decision.NO,
NAME,
"file cache limit reached - remote shard size will exceed configured safeguard ratio"
);
}
return Decision.YES;
}

Map<String, DiskUsage> usages = clusterInfo.getNodeMostAvailableDiskUsages();
final Decision decision = earlyTerminate(allocation, usages);
if (decision != null) {
Expand Down Expand Up @@ -422,6 +465,15 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
if (shardRouting.currentNodeId().equals(node.nodeId()) == false) {
throw new IllegalArgumentException("Shard [" + shardRouting + "] is not allocated on node: [" + node.nodeId() + "]");
}

/*
The following block prevents movement for remote shards since they do not use the local storage as
the primary source of data storage.
*/
if (REMOTE_CAPABLE.equals(getNodePool(node)) && REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) {
return Decision.ALWAYS;
}

final ClusterInfo clusterInfo = allocation.clusterInfo();
final Map<String, DiskUsage> usages = clusterInfo.getNodeLeastAvailableDiskUsages();
final Decision decision = earlyTerminate(allocation, usages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.index.ShardIndexingPressureStore;
import org.opensearch.search.SearchBootstrapSettings;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.search.backpressure.settings.NodeDuressSettings;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
Expand Down Expand Up @@ -646,6 +647,7 @@ public void apply(Settings value, Settings current, Settings previous) {

// Settings related to Searchable Snapshots
Node.NODE_SEARCH_CACHE_SIZE_SETTING,
FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING,

// Settings related to Remote Refresh Segment Pressure
RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.lucene.store.IndexInput;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.breaker.CircuitBreakingException;
import org.opensearch.common.settings.Setting;
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
import org.opensearch.index.store.remote.utils.cache.RefCountedCache;
import org.opensearch.index.store.remote.utils.cache.SegmentedCache;
Expand Down Expand Up @@ -49,6 +50,21 @@ public class FileCache implements RefCountedCache<Path, CachedIndexInput> {

private final CircuitBreaker circuitBreaker;

/**
* Defines a limit of how much total remote data can be referenced as a ratio of the size of the disk reserved for
* the file cache. For example, if 100GB disk space is configured for use as a file cache and the
* remote_data_ratio of 5 is defined, then a total of 500GB of remote data can be loaded as searchable snapshots.
* This is designed to be a safeguard to prevent oversubscribing a cluster.
* Specify a value of zero for no limit, which is the default for compatibility reasons.
*/
public static final Setting<Double> DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING = Setting.doubleSetting(
"cluster.filecache.remote_data_ratio",
0.0,
0.0,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public FileCache(SegmentedCache<Path, CachedIndexInput> cache, CircuitBreaker circuitBreaker) {
this.theCache = cache;
this.circuitBreaker = circuitBreaker;
Expand Down
5 changes: 3 additions & 2 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -945,8 +945,9 @@ protected Node(
clusterModule.getAllocationService(),
metadataCreateIndexService,
metadataIndexUpgradeService,
clusterService.getClusterSettings(),
shardLimitValidator
shardLimitValidator,
indicesService,
clusterInfoService::getClusterInfo
);

final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(
Expand Down
Loading