Skip to content

Commit

Permalink
apacheGH-38318: [Java][FlightRPC] Enable tests that leaked (apache#38719
Browse files Browse the repository at this point in the history
)

### Rationale for this change
This enables tests that are currently disabled to improve coverage and help others build tests based on these.

### What changes are included in this PR?
- Enable tests that were disabled due to flakey memory leaks
- Explicitly close child allocators in these tests to match
  the behavior of FlightServerTestRule which does not leak.
- Change TestBasicAuth to allocate only one server
- Change TestBasicAuth2 to allocate only one server and client
- Fix a bug in testBasucAuth2#asyncPut() not including credentials

### Are these changes tested?
Tested locally.

### Are there any user-facing changes?
No.
* Closes: apache#38318

Authored-by: James Duong <[email protected]>
Signed-off-by: David Li <[email protected]>
  • Loading branch information
jduo authored Nov 15, 2023
1 parent 1e7175d commit 563078f
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import com.google.common.collect.ImmutableList;
Expand All @@ -56,17 +57,15 @@ public class TestBasicAuth {
private static final byte[] VALID_TOKEN = "my_token".getBytes(StandardCharsets.UTF_8);

private FlightClient client;
private FlightServer server;
private BufferAllocator allocator;
private static FlightServer server;
private static BufferAllocator allocator;

@Test
public void validAuth() {
client.authenticateBasic(USERNAME, PASSWORD);
Assertions.assertTrue(ImmutableList.copyOf(client.listFlights(Criteria.ALL)).size() == 0);
}

// ARROW-7722: this test occasionally leaks memory
@Disabled
@Test
public void asyncCall() throws Exception {
client.authenticateBasic(USERNAME, PASSWORD);
Expand Down Expand Up @@ -97,7 +96,12 @@ public void didntAuth() {
}

@BeforeEach
public void setup() throws IOException {
public void testSetup() throws IOException {
client = FlightClient.builder(allocator, server.getLocation()).build();
}

@BeforeAll
public static void setup() throws IOException {
allocator = new RootAllocator(Long.MAX_VALUE);
final BasicServerAuthHandler.BasicAuthValidator validator = new BasicServerAuthHandler.BasicAuthValidator() {

Expand Down Expand Up @@ -149,12 +153,19 @@ public void getStream(CallContext context, Ticket ticket, ServerStreamListener l
}
}
}).authHandler(new BasicServerAuthHandler(validator)).build().start();
client = FlightClient.builder(allocator, server.getLocation()).build();
}

@AfterEach
public void shutdown() throws Exception {
AutoCloseables.close(client, server, allocator);
public void tearDown() throws Exception {
AutoCloseables.close(client);
}

@AfterAll
public static void shutdown() throws Exception {
AutoCloseables.close(server);

allocator.getChildAllocators().forEach(BufferAllocator::close);
AutoCloseables.close(allocator);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,9 @@
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import com.google.common.base.Strings;
Expand All @@ -57,18 +56,18 @@ public class TestBasicAuth2 {
private static final String NO_USERNAME = "";
private static final String PASSWORD_1 = "woohoo1";
private static final String PASSWORD_2 = "woohoo2";
private BufferAllocator allocator;
private FlightServer server;
private FlightClient client;
private FlightClient client2;
private static BufferAllocator allocator;
private static FlightServer server;
private static FlightClient client;
private static FlightClient client2;

@BeforeEach
public void setup() throws Exception {
@BeforeAll
public static void setup() throws Exception {
allocator = new RootAllocator(Long.MAX_VALUE);
startServerAndClient();
}

private FlightProducer getFlightProducer() {
private static FlightProducer getFlightProducer() {
return new NoOpFlightProducer() {
@Override
public void listFlights(CallContext context, Criteria criteria,
Expand Down Expand Up @@ -99,23 +98,26 @@ public void getStream(CallContext context, Ticket ticket, ServerStreamListener l
};
}

private void startServerAndClient() throws IOException {
private static void startServerAndClient() throws IOException {
final FlightProducer flightProducer = getFlightProducer();
this.server = FlightServer
server = FlightServer
.builder(allocator, forGrpcInsecure(LOCALHOST, 0), flightProducer)
.headerAuthenticator(new GeneratedBearerTokenAuthenticator(
new BasicCallHeaderAuthenticator(this::validate)))
new BasicCallHeaderAuthenticator(TestBasicAuth2::validate)))
.build().start();
this.client = FlightClient.builder(allocator, server.getLocation())
client = FlightClient.builder(allocator, server.getLocation())
.build();
}

@AfterEach
public void shutdown() throws Exception {
AutoCloseables.close(client, client2, server, allocator);
@AfterAll
public static void shutdown() throws Exception {
AutoCloseables.close(client, client2, server);
client = null;
client2 = null;
server = null;

allocator.getChildAllocators().forEach(BufferAllocator::close);
AutoCloseables.close(allocator);
allocator = null;
}

Expand All @@ -124,7 +126,7 @@ private void startClient2() throws IOException {
.build();
}

private CallHeaderAuthenticator.AuthResult validate(String username, String password) {
private static CallHeaderAuthenticator.AuthResult validate(String username, String password) {
if (Strings.isNullOrEmpty(username)) {
throw CallStatus.UNAUTHENTICATED.withDescription("Credentials not supplied.").toRuntimeException();
}
Expand Down Expand Up @@ -156,14 +158,12 @@ public void validAuthWithMultipleClientsWithDifferentCredentialsWithBearerAuthSe
testValidAuthWithMultipleClientsWithDifferentCredentials(client, client2);
}

// ARROW-7722: this test occasionally leaks memory
@Disabled
@Test
public void asyncCall() throws Exception {
final CredentialCallOption bearerToken = client
.authenticateBasicToken(USERNAME_1, PASSWORD_1).get();
client.listFlights(Criteria.ALL, bearerToken);
try (final FlightStream s = client.getStream(new Ticket(new byte[1]))) {
try (final FlightStream s = client.getStream(new Ticket(new byte[1]), bearerToken)) {
while (s.next()) {
Assertions.assertEquals(4095, s.getRoot().getRowCount());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -209,10 +208,13 @@ public static void setUp() throws Exception {

@AfterAll
public static void tearDown() throws Exception {
close(sqlClient, server, allocator);
close(sqlClient, server);

// Manually close all child allocators.
allocator.getChildAllocators().forEach(BufferAllocator::close);
close(allocator);
}

@Disabled("Memory leak GH-38268")
@Test
public void testGetTablesResultNoSchema() throws Exception {
try (final FlightStream stream =
Expand All @@ -232,7 +234,6 @@ public void testGetTablesResultNoSchema() throws Exception {
}
}

@Disabled("Memory leak GH-38268")
@Test
public void testGetTableTypesResult() throws Exception {
try (final FlightStream stream =
Expand All @@ -251,7 +252,6 @@ public void testGetTableTypesResult() throws Exception {
}
}

@Disabled("Memory leak GH-38268")
@Test
public void testGetSqlInfoResults() throws Exception {
final FlightInfo info = sqlClient.getSqlInfo();
Expand All @@ -263,7 +263,6 @@ public void testGetSqlInfoResults() throws Exception {
}
}

@Disabled("Memory leak GH-38268")
@Test
public void testGetTypeInfo() throws Exception {
FlightInfo flightInfo = sqlClient.getXdbcTypeInfo();
Expand All @@ -280,7 +279,6 @@ public void testGetTypeInfo() throws Exception {
}
}

@Disabled("Memory leak GH-38268")
@Test
public void testExecuteQuery() throws Exception {
try (final FlightStream stream = sqlClient
Expand Down

0 comments on commit 563078f

Please sign in to comment.