Skip to content
This repository has been archived by the owner on Mar 27, 2021. It is now read-only.

Commit

Permalink
Drop metric with row key length bigger then BT limit (#686)
Browse files Browse the repository at this point in the history
* Drop metric with row key length bigger then BT limit
  • Loading branch information
Sergey Rustamov authored Aug 24, 2020
1 parent fa29634 commit 9c9c6ca
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ public interface MetricBackendReporter {
FutureReporter.Context reportFindSeries();

FutureReporter.Context reportQueryMetrics();

void reportWritesDroppedBySize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public FutureReporter.Context reportQueryMetrics() {
return NoopFutureReporterContext.get();
}

@Override
public void reportWritesDroppedBySize() {

}

private static final NoopMetricBackendReporter instance = new NoopMetricBackendReporter();

public static NoopMetricBackendReporter get() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -82,6 +83,7 @@ public abstract class AbstractMetricBackendIT {
protected boolean brokenSegmentsPr208 = false;
protected boolean eventSupport = false;
protected Optional<Integer> maxBatchSize = Optional.empty();
protected boolean hugeRowKey = true;

@Rule
public TestRule setupBackend = (base, description) -> new Statement() {
Expand Down Expand Up @@ -373,6 +375,26 @@ public void testWriteAndFetchLongSeries() throws Exception {
assertEqualMetrics(mc, fetchMetrics(request, true));
}

@Test
public void testWriteHugeMetric() throws Exception {
assumeTrue("Test huge row key write", hugeRowKey);
final MetricCollection points = new Points().p(100000L, 42D).build();
Map<String, String> tags = new HashMap<>();
for (int i = 0; i < 110; i++) {
tags.put("VeryLongTagName" + i, "VeryLongValueName" + i);
}
final Series hugeSeries = new Series("s1",
ImmutableSortedMap.copyOf(tags),
ImmutableSortedMap.of("resource", "a"));
backend.write(new WriteMetric.Request(hugeSeries, points)).get();

FetchData.Request request =
new FetchData.Request(MetricType.POINT, hugeSeries, new DateRange(10000L, 200000L),
QueryOptions.builder().build());

assertEquals(Collections.emptyList(), fetchMetrics(request, true));
}

private List<MetricCollection> fetchMetrics(FetchData.Request request, boolean slicedFetch)
throws Exception {
if (slicedFetch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@
public class BigtableBackend extends AbstractMetricBackend implements LifeCycles {
private static final Logger log = LoggerFactory.getLogger(BigtableBackend.class);

/* maxmimum number of cells supported for each batch mutation */
/* maximum number of bytes of BigTable row key size allowed*/
public static final int MAX_KEY_ROW_SIZE = 4000;
/* maximum number of cells supported for each batch mutation */
public static final int MAX_BATCH_SIZE = 10000;

public static final QueryTrace.Identifier FETCH_SEGMENT =
Expand Down Expand Up @@ -390,6 +392,14 @@ private <T extends Metric> AsyncFuture<WriteMetric> writeBatch(

for (final Pair<RowKey, Mutations> e : saved) {
final ByteString rowKeyBytes = rowKeySerializer.serializeFull(e.getKey());

if (rowKeyBytes.size() >= MAX_KEY_ROW_SIZE) {
reporter.reportWritesDroppedBySize();
log.error("Row key length greater than 4096 bytes (2): " + rowKeyBytes.size()
+ " " + rowKeyBytes);
continue;
}

writes.add(client
.mutateRow(table, rowKeyBytes, e.getValue())
.directTransform(result -> timer.end()));
Expand Down Expand Up @@ -419,6 +429,14 @@ private <T extends Metric> AsyncFuture<WriteMetric> writeOne(
final RequestTimer<WriteMetric> timer = WriteMetric.timer();

final ByteString rowKeyBytes = rowKeySerializer.serializeFull(rowKey);

if (rowKeyBytes.size() >= MAX_KEY_ROW_SIZE) {
reporter.reportWritesDroppedBySize();
log.error("Row key length greater than 4096 bytes (1): " +
rowKeyBytes.size() + " " + rowKey);
return async.resolved().directTransform(result -> timer.end());
}

return client
.mutateRow(table, rowKeyBytes, builder.build())
.directTransform(result -> timer.end());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ protected void setupSupport() {
super.setupSupport();

this.brokenSegmentsPr208 = true;
this.hugeRowKey = false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ protected void setupSupport() {
super.setupSupport();

this.eventSupport = true;
this.hugeRowKey = false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public class SemanticMetricBackendReporter implements MetricBackendReporter {
private final Histogram queryRowMsBetweenSamples;
// Average samples per mega-seconds :)
private final Histogram queryRowDensity;
// Counter of dropped writes due to row key size
private final Counter writesDroppedBySize;

public SemanticMetricBackendReporter(SemanticMetricRegistry registry) {
final MetricId base = MetricId.build().tagged("component", COMPONENT);
Expand Down Expand Up @@ -118,6 +120,9 @@ public SemanticMetricBackendReporter(SemanticMetricRegistry registry) {
base.tagged("what", "query-metrics-row-metric-distance", "unit", Units.MILLISECOND));
queryRowDensity = registry.histogram(
base.tagged("what", "query-metrics-row-density", "unit", Units.COUNT));

writesDroppedBySize = registry.counter(
base.tagged("what", "writes-dropped-by-size", "unit", Units.COUNT));
}

@Override
Expand Down Expand Up @@ -186,6 +191,11 @@ public void reportOperationEnded() {
};
}

@Override
public void reportWritesDroppedBySize() {
writesDroppedBySize.inc();
}

@Override
public FutureReporter.Context reportFindSeries() {
return findSeries.setup();
Expand Down

0 comments on commit 9c9c6ca

Please sign in to comment.