Skip to content

Commit

Permalink
Fix row ranges issue in Bigtable Read. (#31990)
Browse files Browse the repository at this point in the history
  • Loading branch information
ron-gal authored Jul 31, 2024
1 parent eec2068 commit 346011b
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -436,13 +436,18 @@ private ReadRowsRequest truncateRequest(ReadRowsRequest request, ByteString last
int startCmp = StartPoint.extract(rowRange).compareTo(new StartPoint(lastKey, true));
int endCmp = EndPoint.extract(rowRange).compareTo(new EndPoint(lastKey, true));

if (endCmp <= 0) {
// range end is on or left of the split: skip
continue;
}

RowRange.Builder newRange = rowRange.toBuilder();
if (startCmp > 0) {
// If the startKey is passed the split point than add the whole range
segment.addRowRanges(rowRange);
} else if (endCmp > 0) {
segment.addRowRanges(newRange.build());
} else {
// Row is split, remove all read rowKeys and split RowSet at last buffered Row
RowRange subRange = rowRange.toBuilder().setStartKeyOpen(lastKey).build();
segment.addRowRanges(subRange);
segment.addRowRanges(newRange.setStartKeyOpen(lastKey).build());
}
}
if (segment.getRowRangesCount() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,68 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
Mockito.verify(mockCallMetric, Mockito.times(2)).call("ok");
}

/**
* This test ensures that protobuf creation and interactions with {@link BigtableDataClient} work
* as expected. This test checks that a single row is returned from the future.
*
* @throws IOException
*/
@Test
public void testReadSingleRangeAtSegmentLimit() throws Exception {
RowSet.Builder ranges = RowSet.newBuilder();
ranges.addRowRanges(
generateRowRange(
generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE),
generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE - 1)));

// Set up Callable to be returned by stub.createReadRowsCallable()
ServerStreamingCallable<Query, Row> mockCallable = Mockito.mock(ServerStreamingCallable.class);
List<List<Row>> expectedResults =
ImmutableList.of(
generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE), ImmutableList.of());

// Return multiple answers when mockCallable is called
doAnswer(
new MultipleAnswer<Row>(
ImmutableList.of(
generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE * 2),
ImmutableList.of())))
.when(mockCallable)
.call(any(Query.class), any(ResponseObserver.class), any(ApiCallContext.class));

when(mockStub.createReadRowsCallable(any(RowAdapter.class))).thenReturn(mockCallable);
ServerStreamingCallable<Query, Row> callable =
mockStub.createReadRowsCallable(new BigtableServiceImpl.BigtableRowProtoAdapter());
// Set up client to return callable
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));

BigtableService.Reader underTest =
new BigtableServiceImpl.BigtableSegmentReaderImpl(
mockBigtableDataClient,
bigtableDataSettings.getProjectId(),
bigtableDataSettings.getInstanceId(),
mockBigtableSource.getTableId().get(),
ranges.build(),
RowFilter.getDefaultInstance(),
SEGMENT_SIZE,
DEFAULT_BYTE_SEGMENT_SIZE,
mockCallMetric);

List<Row> actualResults = new ArrayList<>();
Assert.assertTrue(underTest.start());
do {
actualResults.add(underTest.getCurrentRow());
} while (underTest.advance());

Assert.assertEquals(
expectedResults.stream().flatMap(Collection::stream).collect(Collectors.toList()),
actualResults);

Mockito.verify(mockCallMetric, Mockito.times(2)).call("ok");
}

/**
* This test ensures that all the rows are properly added to the buffer and read. This example
* uses a single range with SEGMENT_SIZE*2+1 rows. Range: [b00000, b00001, ... b00199, b00200)
Expand Down

0 comments on commit 346011b

Please sign in to comment.