diff --git a/heroic-test/src/main/java/com/spotify/heroic/test/AbstractMetadataBackendIndexResourceIT.java b/heroic-test/src/main/java/com/spotify/heroic/test/AbstractMetadataBackendIndexResourceIT.java new file mode 100644 index 000000000..0becb4603 --- /dev/null +++ b/heroic-test/src/main/java/com/spotify/heroic/test/AbstractMetadataBackendIndexResourceIT.java @@ -0,0 +1,458 @@ +/* + * Copyright (c) 2015 Spotify AB. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.heroic.test; + +import static com.spotify.heroic.filter.Filter.and; +import static com.spotify.heroic.filter.Filter.hasTag; +import static com.spotify.heroic.filter.Filter.matchKey; +import static com.spotify.heroic.filter.Filter.matchTag; +import static com.spotify.heroic.filter.Filter.not; +import static com.spotify.heroic.filter.Filter.or; +import static com.spotify.heroic.filter.Filter.startsWith; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeFalse; +import static org.junit.Assume.assumeTrue; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.spotify.heroic.HeroicConfig; +import com.spotify.heroic.HeroicCore; +import com.spotify.heroic.HeroicCoreInstance; +import com.spotify.heroic.common.DateRange; +import com.spotify.heroic.common.FeatureSet; +import com.spotify.heroic.common.Features; +import com.spotify.heroic.common.GroupMember; +import com.spotify.heroic.common.OptionalLimit; +import com.spotify.heroic.common.Series; +import com.spotify.heroic.dagger.LoadingComponent; +import com.spotify.heroic.filter.FalseFilter; +import com.spotify.heroic.filter.Filter; +import com.spotify.heroic.filter.MatchKeyFilter; +import com.spotify.heroic.filter.TrueFilter; +import com.spotify.heroic.metadata.CountSeries; +import com.spotify.heroic.metadata.DeleteSeries; +import com.spotify.heroic.metadata.FindKeys; +import com.spotify.heroic.metadata.FindSeries; +import com.spotify.heroic.metadata.FindSeriesIds; +import com.spotify.heroic.metadata.FindTags; +import com.spotify.heroic.metadata.MetadataBackend; +import com.spotify.heroic.metadata.MetadataManagerModule; +import com.spotify.heroic.metadata.MetadataModule; +import com.spotify.heroic.metadata.WriteMetadata; +import eu.toolchain.async.AsyncFramework; +import eu.toolchain.async.AsyncFuture; +import eu.toolchain.async.RetryPolicy; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public abstract class AbstractMetadataBackendIndexResourceIT { + protected final String testName = "heroic-it-" + UUID.randomUUID().toString(); + + private AsyncFramework async; + + protected final int numSeries = 3; + + protected final Series s1 = Series.of( + "s1", + ImmutableMap.of("role", "foo"), + ImmutableMap.of("podname", "foo-bar-123") + ); + protected final Series s2 = Series.of( + "s2", + ImmutableMap.of("role", "bar"), + ImmutableMap.of("podname", "foo-bar-456") + ); + protected final Series s3 = Series.of( + "s3", + ImmutableMap.of("role", "baz"), + ImmutableMap.of("podname", "foo-bar-789") + ); + + protected final DateRange range = new DateRange(0L, 0L); + + private HeroicCoreInstance core; + private Features fullFeatures; + + protected MetadataBackend backend; + + protected boolean deleteSupport = true; + protected boolean findTagsSupport = true; + protected boolean orFilterSupport = true; + protected FeatureSet additionalFeatures = FeatureSet.empty(); + + protected abstract MetadataModule setupModule() throws Exception; + + protected void setupConditions() { } + + @Before + public final void abstractSetup() throws Exception { + final HeroicConfig.Builder fragment = HeroicConfig + .builder() + .metadata(MetadataManagerModule.builder() + .backends(ImmutableList.of(setupModule()))); + + core = HeroicCore + .builder() + .setupService(false) + .setupShellServer(false) + .configFragment(fragment) + .build() + .newInstance(); + + core.start().get(); + + async = core.inject(LoadingComponent::async); + + backend = core + .inject(c -> c + .metadataManager() + .groupSet() + .inspectAll() + .stream() + .map(GroupMember::getMember) + .findFirst()) + .orElseThrow(() -> new IllegalStateException("Failed to find backend")); + + final List> writesIndexResource = new ArrayList<>(); + writesIndexResource.add(writeSeries(backend, s1, range)); + writesIndexResource.add(writeSeries(backend, s2, range)); + writesIndexResource.add(writeSeries(backend, s3, range)); + async.collectAndDiscard(writesIndexResource).get(); + + setupConditions(); + + fullFeatures = Features.DEFAULT.applySet(additionalFeatures); + } + + @After + public final void abstractTeardown() throws Exception { + core.shutdown().get(); + } + + @Test + public void findSeriesComplexTest() throws Exception { + final FindSeries.Request f = + new FindSeries.Request( + and(matchKey("s2"), startsWith("role", "ba")), + range, + OptionalLimit.empty(), + Features.DEFAULT + ); + + assertEquals(ImmutableSet.of(s2), backend.findSeries(f).get().getSeries()); + } + + @Test + public void findSeriesComplexTestIndexResource() throws Exception { + final FindSeries.Request f = + new FindSeries.Request( + and(matchKey("s2"), startsWith("role", "ba")), + range, + OptionalLimit.empty(), + Features.DEFAULT + ); + + assertEquals(ImmutableSet.of(s2), backend.findSeries(f).get().getSeries()); + } + + @Test + public void findSeriesComplexWithFeaturesTest() throws Exception { + assumeFalse(additionalFeatures.isEmpty()); + + final FindSeries.Request f = + new FindSeries.Request( + and(matchKey("s2"), startsWith("role", "ba")), + range, + OptionalLimit.empty(), + fullFeatures + ); + + assertEquals(ImmutableSet.of(s2), backend.findSeries(f).get().getSeries()); + } + + @Test + public void findSeriesTest() throws Exception { + final FindSeries.Request f = new FindSeries.Request( + TrueFilter.get(), range, OptionalLimit.empty(), Features.DEFAULT); + + final FindSeries result = backend.findSeries(f).get(); + + assertEquals(ImmutableSet.of(s1, s2, s3), result.getSeries()); + } + + @Test + public void findSeriesWithFeaturesTest() throws Exception { + assumeFalse(additionalFeatures.isEmpty()); + + final FindSeries.Request f = new FindSeries.Request( + TrueFilter.get(), range, OptionalLimit.empty(), fullFeatures); + + final FindSeries result = backend.findSeries(f).get(); + + assertEquals(ImmutableSet.of(s1, s2, s3), result.getSeries()); + } + + @Test + public void findSeriesLimitedTest() throws Exception { + FindSeries.Request req1 = + new FindSeries.Request(TrueFilter.get(), range, OptionalLimit.of(1L), Features.DEFAULT); + final FindSeries r1 = backend + .findSeries(req1) + .get(); + + assertTrue("Result should be limited", r1.getLimited()); + assertEquals("Result size should be same as limit", 1, r1.getSeries().size()); + + FindSeries.Request req2 = + new FindSeries.Request(TrueFilter.get(), range, OptionalLimit.of(3L), Features.DEFAULT); + final FindSeries r2 = backend + .findSeries(req2) + .get(); + + assertFalse("Result should not be limited", r2.getLimited()); + assertEquals("Result size should be all entries", 3, r2.getSeries().size()); + } + + @Test + public void findSeriesLimitedWithFeaturesTest() throws Exception { + assumeFalse(additionalFeatures.isEmpty()); + + FindSeries.Request req1 = + new FindSeries.Request(TrueFilter.get(), range, OptionalLimit.of(1L), fullFeatures); + final FindSeries r1 = backend + .findSeries(req1) + .get(); + + assertTrue("Result should be limited", r1.getLimited()); + assertEquals("Result size should be same as limit", 1, r1.getSeries().size()); + + FindSeries.Request req2 = + new FindSeries.Request(TrueFilter.get(), range, OptionalLimit.of(3L), fullFeatures); + final FindSeries r2 = backend + .findSeries(req2) + .get(); + + assertFalse("Result should not be limited", r2.getLimited()); + assertEquals("Result size should be all entries", 3, r2.getSeries().size()); + } + + @Test + public void findTags() throws Exception { + assumeTrue(findTagsSupport); + + final FindTags.Request request = + new FindTags.Request(TrueFilter.get(), range, OptionalLimit.empty()); + + final FindTags result = backend.findTags(request).get(); + + assertEquals(ImmutableMap.of("role", ImmutableSet.of("bar", "foo", "baz")), + result.getTags()); + } + + @Test + public void findKeys() throws Exception { + final FindKeys.Request request = + new FindKeys.Request(TrueFilter.get(), range, OptionalLimit.empty()); + + final FindKeys result = backend.findKeys(request).get(); + + assertEquals(ImmutableSet.of(s1.getKey(), s2.getKey(), s3.getKey()), result.getKeys()); + } + + @Test + public void countSeriesTest() throws Exception { + final CountSeries.Request f = + new CountSeries.Request(not(matchKey(s2.getKey())), range, OptionalLimit.empty()); + + final CountSeries result = backend.countSeries(f).get(); + assertEquals(2L, result.getCount()); + } + + @Test + public void findSeriesIdsTest() throws Exception { + final FindSeries.Request f = new FindSeries.Request( + not(matchKey(s2.getKey())), + range, + OptionalLimit.empty(), + Features.DEFAULT + ); + + final FindSeriesIds result = backend.findSeriesIds(f).get(); + assertEquals(ImmutableSet.of(s1.hash(), s3.hash()), result.getIds()); + } + + @Test + public void findSeriesIdsWithFeaturesTest() throws Exception { + assumeFalse(additionalFeatures.isEmpty()); + + final FindSeries.Request f = new FindSeries.Request( + not(matchKey(s2.getKey())), + range, + OptionalLimit.empty(), + fullFeatures + ); + + final FindSeriesIds result = backend.findSeriesIds(f).get(); + assertEquals(ImmutableSet.of(s1.hash(), s3.hash()), result.getIds()); + } + + @Test + public void deleteSeriesTest() throws Exception { + assumeTrue(deleteSupport); + + { + final DeleteSeries.Request request = + new DeleteSeries.Request(not(matchKey(s2.getKey())), range, OptionalLimit.empty()); + + backend.deleteSeries(request).get(); + } + + /* deletes are eventually consistent, wait until they are no longer present + * but only for a limited period of time */ + retrySome(() -> { + final FindSeries.Request f = new FindSeries.Request( + TrueFilter.get(), range, OptionalLimit.empty(), Features.DEFAULT); + + final FindSeries result = backend.findSeries(f).get(); + + assertEquals(ImmutableSet.of(s2), result.getSeries()); + }); + } + + @Test + public void deleteSeriesWithFeaturesTest() throws Exception { + assumeTrue(deleteSupport); + assumeFalse(additionalFeatures.isEmpty()); + + { + final DeleteSeries.Request request = + new DeleteSeries.Request(not(matchKey(s2.getKey())), range, OptionalLimit.empty()); + + backend.deleteSeries(request).get(); + } + + /* deletes are eventually consistent, wait until they are no longer present + * but only for a limited period of time */ + retrySome(() -> { + final FindSeries.Request f = new FindSeries.Request( + TrueFilter.get(), range, OptionalLimit.empty(), fullFeatures); + + final FindSeries result = backend.findSeries(f).get(); + + assertEquals(ImmutableSet.of(s2), result.getSeries()); + }); + } + + @Test + public void filterTest() throws Exception { + assertEquals(ImmutableSet.of(s1.hash(), s2.hash(), s3.hash()), findIds(TrueFilter.get())); + assertEquals(ImmutableSet.of(), findIds(FalseFilter.get())); + + assertEquals(ImmutableSet.of(s2.hash()), findIds(matchKey(s2.getKey()))); + + assertEquals(ImmutableSet.of(s1.hash(), s3.hash()), findIds(not(matchKey(s2.getKey())))); + + assertEquals(ImmutableSet.of(s1.hash()), findIds(matchTag("role", "foo"))); + + if (orFilterSupport) { + assertEquals(ImmutableSet.of(s1.hash(), s2.hash()), + findIds(or(matchKey(s1.getKey()), matchKey(s2.getKey())))); + } + + assertEquals(ImmutableSet.of(s1.hash()), + findIds(and(matchKey(s1.getKey()), matchTag("role", "foo")))); + + assertEquals(ImmutableSet.of(s1.hash(), s2.hash(), s3.hash()), findIds(hasTag("role"))); + } + + /** + * Retry action for a given period of time. + * + * @param action Action to retry if failing + */ + private void retrySome(final ThrowingRunnable action) throws Exception { + AssertionError error = null; + + for (int i = 0; i < 10; i++) { + try { + action.run(); + } catch (final AssertionError e) { + if (error != null) { + e.addSuppressed(error); + } + + error = e; + Thread.sleep(100L); + continue; + } + + return; + } + + throw error; + } + + private Set findIds(final Filter filter) throws Exception { + return backend + .findSeriesIds( + new FindSeries.Request(filter, range, OptionalLimit.empty(), Features.DEFAULT)) + .get() + .getIds(); + } + + private AsyncFuture writeSeries( + final MetadataBackend metadata, final Series s, final DateRange range + ) { + final FindSeries.Request f = new FindSeries.Request( + MatchKeyFilter.create(s.getKey()), + range, + OptionalLimit.empty(), + Features.DEFAULT + ); + + return metadata.write(new WriteMetadata.Request(s, range)).lazyTransform(ignore -> async + .retryUntilResolved(() -> metadata.findSeries(f).directTransform(result -> { + if (!result.getSeries().contains(s)) { + throw new RuntimeException("Expected to find the written series"); + } + + return null; + }), RetryPolicy.timed(10000, RetryPolicy.exponential(100, 200))) + .directTransform(r -> null)); + } + + @FunctionalInterface + interface ThrowingRunnable { + void run() throws Exception; + } +} diff --git a/heroic-test/src/test/java/com/spotify/heroic/metadata/elasticsearch/AbstractMetadataBackendIndexResourceKVIT.java b/heroic-test/src/test/java/com/spotify/heroic/metadata/elasticsearch/AbstractMetadataBackendIndexResourceKVIT.java new file mode 100644 index 000000000..36d999c54 --- /dev/null +++ b/heroic-test/src/test/java/com/spotify/heroic/metadata/elasticsearch/AbstractMetadataBackendIndexResourceKVIT.java @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2020 Spotify AB. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.heroic.metadata.elasticsearch; + +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.ImmutableSet; +import com.spotify.heroic.common.Feature; +import com.spotify.heroic.common.FeatureSet; +import com.spotify.heroic.common.Features; +import com.spotify.heroic.common.OptionalLimit; +import com.spotify.heroic.elasticsearch.ClientWrapper; +import com.spotify.heroic.elasticsearch.ConnectionModule; +import com.spotify.heroic.elasticsearch.SearchTransformResult; +import com.spotify.heroic.elasticsearch.index.RotatingIndexMapping; +import com.spotify.heroic.filter.TrueFilter; +import com.spotify.heroic.metadata.FindSeries; +import com.spotify.heroic.metadata.MetadataModule; +import com.spotify.heroic.test.AbstractMetadataBackendIndexResourceIT; +import com.spotify.heroic.test.ElasticSearchTestContainer; +import java.util.Set; +import org.junit.Test; + +public abstract class AbstractMetadataBackendIndexResourceKVIT extends + AbstractMetadataBackendIndexResourceIT { + final static ElasticSearchTestContainer esContainer; + + static { + esContainer = ElasticSearchTestContainer.getInstance(); + } + + protected abstract ClientWrapper setupClient(); + + @Override + protected void setupConditions() { + // TODO: support findTags? + findTagsSupport = false; + + additionalFeatures = FeatureSet.of(Feature.METADATA_LIVE_CURSOR); + } + + @Override + protected MetadataModule setupModule() { + RotatingIndexMapping index = + RotatingIndexMapping.builder().pattern(testName + "-%s").build(); + + return ElasticsearchMetadataModule + .builder() + .templateName(testName) + .configure(true) + .backendType("kv") + .connection(ConnectionModule + .builder() + .index(index) + .clientSetup(setupClient()) + .build()) + .scrollSize(numSeries / 2) + .indexResourceIdentifiers(true) + .build(); + } + + @Test + public void testHashField() throws Exception { + FindSeries.Request f = new FindSeries.Request( + TrueFilter.get(), range, OptionalLimit.empty(), Features.DEFAULT); + + Set hashes = ((MetadataBackendKV) backend).entries( + f, + hit -> (String) hit.getSourceAsMap().get("hash"), + SearchTransformResult::getSet, + request -> { } + ).get(); + + assertEquals(ImmutableSet.of(s1.hash(), s2.hash(), s3.hash()), hashes); + } +} diff --git a/heroic-test/src/test/java/com/spotify/heroic/metadata/elasticsearch/MetadataBackendIndexResourceKVRestIT.java b/heroic-test/src/test/java/com/spotify/heroic/metadata/elasticsearch/MetadataBackendIndexResourceKVRestIT.java new file mode 100644 index 000000000..719862881 --- /dev/null +++ b/heroic-test/src/test/java/com/spotify/heroic/metadata/elasticsearch/MetadataBackendIndexResourceKVRestIT.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2020 Spotify AB. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.heroic.metadata.elasticsearch; + +import static com.spotify.heroic.metadata.elasticsearch.AbstractMetadataBackendIndexResourceKVIT.esContainer; + +import com.spotify.heroic.elasticsearch.ClientWrapper; +import com.spotify.heroic.elasticsearch.RestClientWrapper; +import java.util.List; + +public class MetadataBackendIndexResourceKVRestIT extends AbstractMetadataBackendIndexResourceKVIT { + @Override + protected ClientWrapper setupClient() { + List seeds = List.of( + esContainer.getTcpHost().getHostName() + + ":" + esContainer.getContainer().getMappedPort(9200)); + + return new RestClientWrapper(seeds); + } +} diff --git a/heroic-test/src/test/java/com/spotify/heroic/metadata/elasticsearch/MetadataBackendIndexResourceKVTransportIT.java b/heroic-test/src/test/java/com/spotify/heroic/metadata/elasticsearch/MetadataBackendIndexResourceKVTransportIT.java new file mode 100644 index 000000000..343169783 --- /dev/null +++ b/heroic-test/src/test/java/com/spotify/heroic/metadata/elasticsearch/MetadataBackendIndexResourceKVTransportIT.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2015 Spotify AB. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.heroic.metadata.elasticsearch; + +import com.spotify.heroic.elasticsearch.ClientWrapper; +import com.spotify.heroic.elasticsearch.TransportClientWrapper; +import java.util.List; + +public class MetadataBackendIndexResourceKVTransportIT extends AbstractMetadataBackendIndexResourceKVIT { + @Override + protected ClientWrapper setupClient() { + List seeds = List.of( + esContainer.getTcpHost().getHostName() + + ":" + esContainer.getContainer().getMappedPort(9300)); + + return TransportClientWrapper.builder() + .clusterName("docker-cluster") + .seeds(seeds) + .build(); + } +} diff --git a/metadata/elasticsearch/src/main/java/com/spotify/heroic/metadata/elasticsearch/ElasticsearchMetadataModule.java b/metadata/elasticsearch/src/main/java/com/spotify/heroic/metadata/elasticsearch/ElasticsearchMetadataModule.java index 8333a78d9..5405a2f51 100644 --- a/metadata/elasticsearch/src/main/java/com/spotify/heroic/metadata/elasticsearch/ElasticsearchMetadataModule.java +++ b/metadata/elasticsearch/src/main/java/com/spotify/heroic/metadata/elasticsearch/ElasticsearchMetadataModule.java @@ -98,6 +98,7 @@ public final class ElasticsearchMetadataModule implements MetadataModule, Dynami private final int deleteParallelism; private final boolean configure; private final int scrollSize; + private final boolean indexResourceIdentifiers; private static Supplier defaultSetup = MetadataBackendKV::backendType; @@ -129,7 +130,8 @@ public ElasticsearchMetadataModule( @JsonProperty("templateName") Optional templateName, @JsonProperty("backendType") Optional backendType, @JsonProperty("configure") Optional configure, - @JsonProperty("scrollSize") Optional scrollSize + @JsonProperty("scrollSize") Optional scrollSize, + @JsonProperty("indexResourceIdentifiers") Optional indexResourceIdentifiers ) { this.id = id; this.groups = groups.orElseGet(Groups::empty).or(DEFAULT_GROUP); @@ -152,6 +154,7 @@ public ElasticsearchMetadataModule( this.backendTypeBuilder = backendType.flatMap(bt -> ofNullable(backendTypes.get(bt))).orElse(defaultSetup); this.configure = configure.orElse(false); + this.indexResourceIdentifiers = indexResourceIdentifiers.orElse(false); } @Override @@ -244,6 +247,13 @@ public int scrollSize() { return scrollSize; } + @Provides + @ElasticsearchScope + @Named("indexResourceIdentifiers") + public boolean indexResourceIdentifiers() { + return indexResourceIdentifiers; + } + @Provides @ElasticsearchScope public RateLimitedCache> writeCache(HeroicReporter reporter) { @@ -306,6 +316,7 @@ public static class Builder { private Optional backendType = empty(); private Optional configure = empty(); private Optional scrollSize = empty(); + private Optional indexResourceIdentifiers = empty(); public Builder id(final String id) { checkNotNull(id, "id"); @@ -384,6 +395,11 @@ public Builder scrollSize(int scrollSize) { return this; } + public Builder indexResourceIdentifiers(final boolean indexResourceIdentifiers) { + this.indexResourceIdentifiers = of(indexResourceIdentifiers); + return this; + } + public ElasticsearchMetadataModule build() { return new ElasticsearchMetadataModule( id, @@ -399,7 +415,8 @@ public ElasticsearchMetadataModule build() { templateName, backendType, configure, - scrollSize + scrollSize, + indexResourceIdentifiers ); } } diff --git a/metadata/elasticsearch/src/main/java/com/spotify/heroic/metadata/elasticsearch/MetadataBackendKV.java b/metadata/elasticsearch/src/main/java/com/spotify/heroic/metadata/elasticsearch/MetadataBackendKV.java index d3981e508..e94874c0d 100644 --- a/metadata/elasticsearch/src/main/java/com/spotify/heroic/metadata/elasticsearch/MetadataBackendKV.java +++ b/metadata/elasticsearch/src/main/java/com/spotify/heroic/metadata/elasticsearch/MetadataBackendKV.java @@ -128,6 +128,7 @@ public class MetadataBackendKV extends AbstractElasticsearchMetadataBackend private static final String KEY = "key"; private static final String TAGS = "tags"; private static final String TAG_KEYS = "tag_keys"; + private static final String RESOURCE = "resource"; private static final String HASH_FIELD = "hash"; private static final Character TAG_DELIMITER = '\0'; @@ -143,6 +144,7 @@ public class MetadataBackendKV extends AbstractElasticsearchMetadataBackend private final boolean configure; private final int deleteParallelism; private final int scrollSize; + private final boolean indexResourceIdentifiers; @Inject public MetadataBackendKV( @@ -153,7 +155,8 @@ public MetadataBackendKV( RateLimitedCache> writeCache, @Named("configure") boolean configure, @Named("deleteParallelism") int deleteParallelism, - @Named("scrollSize") int scrollSize + @Named("scrollSize") int scrollSize, + @Named("indexResourceIdentifiers") boolean indexResourceIdentifiers ) { super(async, METADATA_TYPE, reporter); this.groups = groups; @@ -164,6 +167,7 @@ public MetadataBackendKV( this.configure = configure; this.deleteParallelism = deleteParallelism; this.scrollSize = scrollSize; + this.indexResourceIdentifiers = indexResourceIdentifiers; } @Override @@ -197,7 +201,6 @@ public AsyncFuture write(final WriteMetadata.Request request) { return write(request, tracer.getCurrentSpan()); } - @Override public AsyncFuture write( final WriteMetadata.Request request, final Span parentSpan @@ -210,6 +213,7 @@ public AsyncFuture write( final Scope rootScope = tracer.withSpan(rootSpan); final Series series = request.getSeries(); + final String id = series.hash(); rootSpan.putAttribute("id", AttributeValue.stringAttributeValue(id)); @@ -244,7 +248,7 @@ public AsyncFuture write( final XContentBuilder source = XContentFactory.jsonBuilder(); source.startObject(); - buildContext(source, series); + buildContext(source, series, indexResourceIdentifiers); source.endObject(); IndexRequest indexRequest = new IndexRequest(index) @@ -439,7 +443,15 @@ protected Series toSeries(SearchHit hit) { final Iterator> tags = ((List) source.get(TAGS)).stream().map(this::buildTag).iterator(); - return Series.of(key, tags); + if (indexResourceIdentifiers) { + @SuppressWarnings("unchecked") + final Iterator> resource = + ((List) source.get(RESOURCE)).stream().map(this::buildTag).iterator(); + + return Series.of(key, tags, resource); + } else { + return Series.of(key, tags); + } } protected AsyncFuture entries( @@ -577,14 +589,14 @@ private Map.Entry buildTag(String kv) { return Pair.of(tk, tv); } - private static void buildContext(final XContentBuilder b, Series series) throws IOException { + private static void buildContext( + final XContentBuilder b, + Series series, + boolean indexResourceIdentifiers + ) throws IOException { b.field(KEY, series.getKey()); - b.startArray(TAGS); - for (final Map.Entry entry : series.getTags().entrySet()) { - b.value(entry.getKey() + TAG_DELIMITER + entry.getValue()); - } - b.endArray(); + appendData(b, series.getTags(), TAGS, TAG_DELIMITER); b.startArray(TAG_KEYS); for (final Map.Entry entry : series.getTags().entrySet()) { @@ -592,9 +604,23 @@ private static void buildContext(final XContentBuilder b, Series series) throws } b.endArray(); + if (indexResourceIdentifiers) { + appendData(b, series.getResource(), RESOURCE, TAG_DELIMITER); + } + b.field(HASH_FIELD, series.hash()); } + private static void appendData( + final XContentBuilder b, Map data, String name, Character delimiter) + throws IOException { + b.startArray(name); + for (var entry : data.entrySet()) { + b.value(entry.getKey() + delimiter + entry.getValue()); + } + b.endArray(); + } + private static final Filter.Visitor FILTER_CONVERTER = new Filter.Visitor<>() { @Override