Skip to content
This repository has been archived by the owner on Mar 27, 2021. It is now read-only.

Commit

Permalink
Dynamic Metadata Read Indices (#746)
Browse files Browse the repository at this point in the history
* dynamic read index count if config flag true

* relocate config flag, prevent bad case of single mapping && dynamic

* create helper function and add test

* make maxReadIndices configurable for proper delete support. further bifurcation of index mapping classes

* add docs
  • Loading branch information
adsail authored Jan 25, 2021
1 parent a055e34 commit 3597263
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 24 deletions.
3 changes: 3 additions & 0 deletions docs/content/_docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,9 @@ maxReadIndices: <int> default = 2
# Maximum indices to write to at a time. Minumum of 1.
maxWriteIndices: <int> default = 1

# Support dynamically determining number of indices to read based upon the range of a query
supportDynamicMaxReadIndices: <boolean> default = false

# Pattern to use when creating an index. The pattern must contain a single '%s' that will be
# replaced with the base time stamp of the index.
pattern: <string> default = heroic-%s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public HeroicConfig.Builder build(final ExtraParameters params) {
.map(index::pattern);
params.getDuration("interval")
.map(index::interval);
params.getBoolean("supportDynamicMaxReadIndices")
.map(index::dynamicMaxReadIndices);
params.getInteger("maxReadIndices")
.map(index::maxReadIndices);

final ConnectionModule.Builder connection = ConnectionModule.builder().index(index.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ interface IndexMapping {
*/
@Throws(NoIndexSelectedException::class)
fun delete(type: String, id: String): List<DeleteRequest>
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,22 @@ private val DEFAULT_INTERVAL = Duration.of(7, TimeUnit.DAYS)
private const val DEFAULT_MAX_READ_INDICES = 2
private const val DEFAULT_MAX_WRITE_INDICES = 1
private const val DEFAULT_PATTERN = "heroic-%s"
private const val DEFAULT_DYNAMIC_MAX_READ_INDICES = false

private val OPTIONS = IndicesOptions.fromOptions(
true, true, false, false)

data class RotatingIndexMapping(
@JsonProperty("interval") private val intervalDuration: Duration = DEFAULT_INTERVAL,
private val maxReadIndices: Int = DEFAULT_MAX_READ_INDICES,
@JsonProperty("supportDynamicMaxReadIndices") private val supportDynamicMaxReadIndices: Boolean = DEFAULT_DYNAMIC_MAX_READ_INDICES,
@JsonProperty("maxReadIndices") private val maxReadIndices: Int = DEFAULT_MAX_READ_INDICES,
private val maxWriteIndices: Int = DEFAULT_MAX_WRITE_INDICES,
private val pattern: String = DEFAULT_PATTERN,
override val settings: Map<String, Any> = emptyMap()
): IndexMapping {
private val interval = intervalDuration.convert(TimeUnit.MILLISECONDS)
override val template = pattern.format("*")
val dynamicMaxReadIndicesSupport = supportDynamicMaxReadIndices

private fun indices(maxIndices: Int, now: Long, type: String): Array<String> {
val curr = now - (now % interval)
Expand All @@ -58,6 +61,27 @@ data class RotatingIndexMapping(
.toTypedArray()
}

@Throws(NoIndexSelectedException::class)
fun readIndicesInRange(now: Long, type: String, start: Long): Array<String> {
// Query indices within range + 1 to account for caching edge cases
val maxIndicesInRange = dynamicMaxIndicesCount(now, start)

val indices = indices(maxIndicesInRange.toInt(), now, type)

if(indices.isEmpty()) {
throw NoIndexSelectedException()
}

return indices
}

private fun dynamicMaxIndicesCount(now: Long, start: Long): Long {
val r = now - start

// Query indices within range + 1 to account for caching edge cases
return (r / interval) + 1
}

@Throws(NoIndexSelectedException::class)
fun readIndices(now: Long, type: String): Array<String> {
val indices = indices(maxReadIndices, now, type)
Expand All @@ -72,6 +96,11 @@ data class RotatingIndexMapping(
return readIndices(System.currentTimeMillis(), type)
}

@Throws(NoIndexSelectedException::class)
fun readIndicesInRange(type: String, start: Long): Array<String> {
return readIndicesInRange(System.currentTimeMillis(), type, start)
}

fun writeIndices(now: Long, type: String): Array<String> {
return indices(maxWriteIndices, now, type)
}
Expand All @@ -85,18 +114,29 @@ data class RotatingIndexMapping(
return SearchRequest(*readIndices(type)).indicesOptions(OPTIONS)
}

@Throws(NoIndexSelectedException::class)
fun searchInRange(type: String, start: Long): SearchRequest {
return SearchRequest(*readIndicesInRange(type, start)).indicesOptions(OPTIONS)
}

@Throws(NoIndexSelectedException::class)
override fun count(type: String): SearchRequest {
return search(type).source(SearchSourceBuilder().size(0))
}

@Throws(NoIndexSelectedException::class)
fun countInRange(type: String, start: Long): SearchRequest {
return searchInRange(type, start).source(SearchSourceBuilder().size(0))
}

@Throws(NoIndexSelectedException::class)
override fun delete(type: String, id: String): List<DeleteRequest> {
return readIndices(type).map { DeleteRequest(it, id) }
}

class Builder {
var interval: Duration = DEFAULT_INTERVAL
var dynamicMaxReadIndices: Boolean = DEFAULT_DYNAMIC_MAX_READ_INDICES
var maxReadIndices: Int = DEFAULT_MAX_READ_INDICES
var maxWriteIndices: Int = DEFAULT_MAX_WRITE_INDICES
var pattern: String = DEFAULT_PATTERN
Expand All @@ -107,6 +147,11 @@ data class RotatingIndexMapping(
return this
}

fun dynamicMaxReadIndices(dynamicMaxReadIndices: Boolean): Builder {
this.dynamicMaxReadIndices = dynamicMaxReadIndices
return this
}

fun maxReadIndices(maxReadIndices: Int): Builder {
this.maxReadIndices = maxReadIndices
return this
Expand All @@ -130,6 +175,7 @@ data class RotatingIndexMapping(
fun build(): RotatingIndexMapping {
return RotatingIndexMapping(
interval,
dynamicMaxReadIndices,
maxReadIndices,
maxWriteIndices,
pattern,
Expand All @@ -142,4 +188,4 @@ data class RotatingIndexMapping(
@JvmStatic
fun builder() = Builder()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,4 @@ data class SingleIndexMapping(
return SingleIndexMapping(index, settings)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class RotatingIndexMappingTest {

@Before
public void setup() {
rotating = new RotatingIndexMapping(interval, maxReadIndices, maxWriteIndices, pattern,
rotating = new RotatingIndexMapping(interval, true, maxReadIndices, maxWriteIndices, pattern,
new HashMap<>());
}

Expand All @@ -28,6 +28,12 @@ public void testReadIndex() throws NoIndexSelectedException {
assertArrayEquals(new String[]{"index-typeA-8000", "index-typeA-7000"}, indices);
}

@Test
public void testReadIndexInRange() throws NoIndexSelectedException {
final String[] indices = rotating.readIndicesInRange(5000, "typeA", 1000);
assertArrayEquals(new String[]{"index-typeA-5000", "index-typeA-4000", "index-typeA-3000", "index-typeA-2000", "index-typeA-1000"}, indices);
}

@Test
public void testEmptyReadIndex() throws NoIndexSelectedException {
final String[] indices = rotating.readIndices(0, "typeA");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.spotify.heroic.elasticsearch.RateLimitedCache;
import com.spotify.heroic.elasticsearch.SearchTransformResult;
import com.spotify.heroic.elasticsearch.index.NoIndexSelectedException;
import com.spotify.heroic.elasticsearch.index.RotatingIndexMapping;
import com.spotify.heroic.filter.AndFilter;
import com.spotify.heroic.filter.FalseFilter;
import com.spotify.heroic.filter.Filter;
Expand Down Expand Up @@ -327,7 +328,17 @@ public AsyncFuture<CountSeries> countSeries(final CountSeries.Request filter) {

final QueryBuilder f = filter(filter.getFilter());

SearchRequest request = c.getIndex().count(METADATA_TYPE);
SearchRequest request;

if (c.getIndex() instanceof RotatingIndexMapping &&
((RotatingIndexMapping) c.getIndex()).getDynamicMaxReadIndicesSupport()) {
final long start = filter.getRange().getStart();
RotatingIndexMapping i = (RotatingIndexMapping) c.getIndex();
request = i.countInRange(METADATA_TYPE, start);
} else {
request = c.getIndex().count(METADATA_TYPE);
}

SearchSourceBuilder sourceBuilder = request.source();
limit.asInteger().ifPresent(sourceBuilder::terminateAfter);
sourceBuilder.query(new BoolQueryBuilder().must(f));
Expand Down Expand Up @@ -414,20 +425,33 @@ public AsyncFuture<DeleteSeries> deleteSeries(final DeleteSeries.Request request

@Override
public AsyncFuture<FindKeys> findKeys(final FindKeys.Request request) {
return doto(c -> {
final QueryBuilder f = filter(request.getFilter());
SearchRequest searchRequest = c.getIndex().search(METADATA_TYPE);
searchRequest.source().query(new BoolQueryBuilder().must(f));

{
final AggregationBuilder terms = AggregationBuilders.terms("terms").field(KEY);
searchRequest.source().aggregation(terms);
}
return doto(
c -> {
final QueryBuilder f = filter(request.getFilter());

final ResolvableFuture<SearchResponse> future = async.future();
future.resolve(c.execute(searchRequest));
SearchRequest searchRequest;
if (c.getIndex() instanceof RotatingIndexMapping &&
((RotatingIndexMapping) c.getIndex()).getDynamicMaxReadIndicesSupport()) {
final long start = request.getRange().getStart();
RotatingIndexMapping ri = (RotatingIndexMapping) c.getIndex();
searchRequest = ri.searchInRange(METADATA_TYPE, start);
} else {
searchRequest = c.getIndex().search(METADATA_TYPE);
}

searchRequest.source().query(new BoolQueryBuilder().must(f));

{
final AggregationBuilder terms = AggregationBuilders.terms("terms").field(KEY);
searchRequest.source().aggregation(terms);
}

return future.directTransform(response -> {
final ResolvableFuture<SearchResponse> future = async.future();
future.resolve(c.execute(searchRequest));

return future.directTransform(
response -> {
final Terms terms = response.getAggregations().get("terms");

final Set<String> keys = new HashSet<>();
Expand All @@ -436,13 +460,13 @@ public AsyncFuture<FindKeys> findKeys(final FindKeys.Request request) {
int duplicates = 0;

for (final Terms.Bucket bucket : terms.getBuckets()) {
if (keys.add(bucket.getKeyAsString())) {
duplicates += 1;
}
if (keys.add(bucket.getKeyAsString())) {
duplicates += 1;
}
}

return new FindKeys(keys, size, duplicates);
});
});
});
}

Expand Down Expand Up @@ -481,8 +505,16 @@ protected <T, O> AsyncFuture<O> entries(
OptionalLimit limit = seriesRequest.getLimit();

return doto(c -> {
SearchRequest request =
c.getIndex().search(METADATA_TYPE).allowPartialSearchResults(false);
SearchRequest request;
if (c.getIndex() instanceof RotatingIndexMapping &&
((RotatingIndexMapping) c.getIndex()).getDynamicMaxReadIndicesSupport()) {
final long start = seriesRequest.getRange().getStart();
RotatingIndexMapping ri = (RotatingIndexMapping) c.getIndex();
request = ri.searchInRange(METADATA_TYPE, start);
} else {
request =
c.getIndex().search(METADATA_TYPE).allowPartialSearchResults(false);
}

request.source()
.size(limit.asMaxInteger(scrollSize))
Expand Down Expand Up @@ -551,7 +583,16 @@ private <T, O> AsyncObservable<O> entriesStream(
OptionalLimit limit = findRequest.getLimit();

return observer -> connection.doto(c -> {
SearchRequest request = c.getIndex().search(METADATA_TYPE).scroll(SCROLL_TIME);
SearchRequest request;
if (c.getIndex() instanceof RotatingIndexMapping &&
((RotatingIndexMapping) c.getIndex()).getDynamicMaxReadIndicesSupport()) {
final long start = findRequest.getRange().getStart();
RotatingIndexMapping ri = (RotatingIndexMapping) c.getIndex();
request = ri.searchInRange(METADATA_TYPE, start);
} else {
request = c.getIndex().search(METADATA_TYPE).scroll(SCROLL_TIME);
}

request.source()
.size(limit.asMaxInteger(scrollSize))
.query(new BoolQueryBuilder().must(filter));
Expand Down

0 comments on commit 3597263

Please sign in to comment.