diff --git a/build.gradle.kts b/build.gradle.kts index d4bd0fca4fc1..82b3b6356a69 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -293,7 +293,6 @@ tasks.register("javaPreCommit") { dependsOn(":sdks:java:extensions:sorter:build") dependsOn(":sdks:java:extensions:timeseries:build") dependsOn(":sdks:java:extensions:zetasketch:build") - dependsOn(":sdks:java:fn-execution:build") dependsOn(":sdks:java:harness:build") dependsOn(":sdks:java:harness:jmh:build") dependsOn(":sdks:java:io:bigquery-io-perf-tests:build") diff --git a/runners/direct-java/build.gradle b/runners/direct-java/build.gradle index e0365fb839ce..39fc1d2a53c9 100644 --- a/runners/direct-java/build.gradle +++ b/runners/direct-java/build.gradle @@ -26,7 +26,6 @@ def dependOnProjects = [":runners:core-construction-java", ":runners:core-java", ":runners:local-java", ":runners:java-fn-execution", - ":sdks:java:fn-execution", ":sdks:java:extensions:avro" ] @@ -95,7 +94,6 @@ dependencies { validatesRunner project(path: project.path, configuration: "shadowTest") permitUnusedDeclared library.java.vendored_grpc_1_60_1 permitUnusedDeclared project(":runners:java-fn-execution") - permitUnusedDeclared project(":sdks:java:fn-execution") permitUnusedDeclared project(":sdks:java:extensions:avro") examplesJavaIntegrationTest project(project.path) examplesJavaIntegrationTest project(":examples:java") diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java index 59d8c55c52d3..fac56890f498 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java @@ -18,11 +18,7 @@ package org.apache.beam.runners.dataflow.worker; import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doCallRealMethod; -import static org.mockito.Mockito.when; -import com.google.api.client.http.LowLevelHttpResponse; import com.google.api.client.json.Json; import com.google.api.client.testing.http.MockHttpTransport; import com.google.api.client.testing.http.MockLowLevelHttpRequest; @@ -53,7 +49,6 @@ import org.apache.beam.sdk.util.FastNanoClockAndSleeper; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; -import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -61,8 +56,6 @@ import org.junit.rules.Timeout; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,36 +68,34 @@ public class DataflowWorkUnitClientTest { private static final String PROJECT_ID = "TEST_PROJECT_ID"; private static final String JOB_ID = "TEST_JOB_ID"; private static final String WORKER_ID = "TEST_WORKER_ID"; + @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties(); @Rule public TestRule restoreLogging = new RestoreDataflowLoggingMDC(); @Rule public ExpectedException expectedException = ExpectedException.none(); @Rule public FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper(); - @Mock private MockHttpTransport transport; - @Mock private MockLowLevelHttpRequest request; - private DataflowWorkerHarnessOptions pipelineOptions; - - @Before - public void setUp() throws Exception { - MockitoAnnotations.initMocks(this); - when(transport.buildRequest(anyString(), anyString())).thenReturn(request); - doCallRealMethod().when(request).getContentAsString(); + DataflowWorkerHarnessOptions createPipelineOptionsWithTransport(MockHttpTransport transport) { Dataflow service = new Dataflow(transport, Transport.getJsonFactory(), null); - pipelineOptions = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class); + DataflowWorkerHarnessOptions pipelineOptions = + PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class); pipelineOptions.setProject(PROJECT_ID); pipelineOptions.setJobId(JOB_ID); pipelineOptions.setWorkerId(WORKER_ID); pipelineOptions.setGcpCredential(new TestCredential()); pipelineOptions.setDataflowClient(service); pipelineOptions.setRegion("us-central1"); + return pipelineOptions; } @Test public void testCloudServiceCall() throws Exception { WorkItem workItem = createWorkItem(PROJECT_ID, JOB_ID); - when(request.execute()).thenReturn(generateMockResponse(workItem)); - + MockLowLevelHttpResponse response = generateMockResponse(workItem); + MockLowLevelHttpRequest request = new MockLowLevelHttpRequest().setResponse(response); + MockHttpTransport transport = + new MockHttpTransport.Builder().setLowLevelHttpRequest(request).build(); + DataflowWorkerHarnessOptions pipelineOptions = createPipelineOptionsWithTransport(transport); WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG); assertEquals(Optional.of(workItem), client.getWorkItem()); @@ -124,30 +115,40 @@ public void testCloudServiceCall() throws Exception { @Test public void testCloudServiceCallMapTaskStagePropagation() throws Exception { - WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG); - // Publish and acquire a map task work item, and verify we're now processing that stage. final String stageName = "test_stage_name"; MapTask mapTask = new MapTask(); mapTask.setStageName(stageName); WorkItem workItem = createWorkItem(PROJECT_ID, JOB_ID); workItem.setMapTask(mapTask); - when(request.execute()).thenReturn(generateMockResponse(workItem)); + + MockLowLevelHttpResponse response = generateMockResponse(workItem); + MockLowLevelHttpRequest request = new MockLowLevelHttpRequest().setResponse(response); + MockHttpTransport transport = + new MockHttpTransport.Builder().setLowLevelHttpRequest(request).build(); + DataflowWorkerHarnessOptions pipelineOptions = createPipelineOptionsWithTransport(transport); + WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG); + assertEquals(Optional.of(workItem), client.getWorkItem()); assertEquals(stageName, DataflowWorkerLoggingMDC.getStageName()); } @Test public void testCloudServiceCallSeqMapTaskStagePropagation() throws Exception { - WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG); - // Publish and acquire a seq map task work item, and verify we're now processing that stage. final String stageName = "test_stage_name"; SeqMapTask seqMapTask = new SeqMapTask(); seqMapTask.setStageName(stageName); WorkItem workItem = createWorkItem(PROJECT_ID, JOB_ID); workItem.setSeqMapTask(seqMapTask); - when(request.execute()).thenReturn(generateMockResponse(workItem)); + + MockLowLevelHttpResponse response = generateMockResponse(workItem); + MockLowLevelHttpRequest request = new MockLowLevelHttpRequest().setResponse(response); + MockHttpTransport transport = + new MockHttpTransport.Builder().setLowLevelHttpRequest(request).build(); + DataflowWorkerHarnessOptions pipelineOptions = createPipelineOptionsWithTransport(transport); + WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG); + assertEquals(Optional.of(workItem), client.getWorkItem()); assertEquals(stageName, DataflowWorkerLoggingMDC.getStageName()); } @@ -157,8 +158,11 @@ public void testCloudServiceCallNoWorkPresent() throws Exception { // If there's no work the service should return an empty work item. WorkItem workItem = new WorkItem(); - when(request.execute()).thenReturn(generateMockResponse(workItem)); - + MockLowLevelHttpResponse response = generateMockResponse(workItem); + MockLowLevelHttpRequest request = new MockLowLevelHttpRequest().setResponse(response); + MockHttpTransport transport = + new MockHttpTransport.Builder().setLowLevelHttpRequest(request).build(); + DataflowWorkerHarnessOptions pipelineOptions = createPipelineOptionsWithTransport(transport); WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG); assertEquals(Optional.empty(), client.getWorkItem()); @@ -181,8 +185,11 @@ public void testCloudServiceCallNoWorkId() throws Exception { WorkItem workItem = createWorkItem(PROJECT_ID, JOB_ID); workItem.setId(null); - when(request.execute()).thenReturn(generateMockResponse(workItem)); - + MockLowLevelHttpResponse response = generateMockResponse(workItem); + MockLowLevelHttpRequest request = new MockLowLevelHttpRequest().setResponse(response); + MockHttpTransport transport = + new MockHttpTransport.Builder().setLowLevelHttpRequest(request).build(); + DataflowWorkerHarnessOptions pipelineOptions = createPipelineOptionsWithTransport(transport); WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG); assertEquals(Optional.empty(), client.getWorkItem()); @@ -201,8 +208,11 @@ public void testCloudServiceCallNoWorkId() throws Exception { @Test public void testCloudServiceCallNoWorkItem() throws Exception { - when(request.execute()).thenReturn(generateMockResponse()); - + MockLowLevelHttpResponse response = generateMockResponse(); + MockLowLevelHttpRequest request = new MockLowLevelHttpRequest().setResponse(response); + MockHttpTransport transport = + new MockHttpTransport.Builder().setLowLevelHttpRequest(request).build(); + DataflowWorkerHarnessOptions pipelineOptions = createPipelineOptionsWithTransport(transport); WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG); assertEquals(Optional.empty(), client.getWorkItem()); @@ -228,8 +238,11 @@ public void testCloudServiceCallMultipleWorkItems() throws Exception { WorkItem workItem1 = createWorkItem(PROJECT_ID, JOB_ID); WorkItem workItem2 = createWorkItem(PROJECT_ID, JOB_ID); - when(request.execute()).thenReturn(generateMockResponse(workItem1, workItem2)); - + MockLowLevelHttpResponse response = generateMockResponse(workItem1, workItem2); + MockLowLevelHttpRequest request = new MockLowLevelHttpRequest().setResponse(response); + MockHttpTransport transport = + new MockHttpTransport.Builder().setLowLevelHttpRequest(request).build(); + DataflowWorkerHarnessOptions pipelineOptions = createPipelineOptionsWithTransport(transport); WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG); client.getWorkItem(); @@ -242,7 +255,13 @@ public void testReportWorkerMessage_streamingScalingReport() throws Exception { SendWorkerMessagesResponse workerMessage = new SendWorkerMessagesResponse(); workerMessage.setFactory(Transport.getJsonFactory()); response.setContent(workerMessage.toPrettyString()); - when(request.execute()).thenReturn(response); + + MockLowLevelHttpRequest request = new MockLowLevelHttpRequest().setResponse(response); + MockHttpTransport transport = + new MockHttpTransport.Builder().setLowLevelHttpRequest(request).build(); + DataflowWorkerHarnessOptions pipelineOptions = createPipelineOptionsWithTransport(transport); + WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG); + StreamingScalingReport activeThreadsReport = new StreamingScalingReport() .setActiveThreadCount(1) @@ -251,7 +270,6 @@ public void testReportWorkerMessage_streamingScalingReport() throws Exception { .setMaximumThreadCount(4) .setMaximumBundleCount(5) .setMaximumBytes(6L); - WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG); WorkerMessage msg = client.createWorkerMessageFromStreamingScalingReport(activeThreadsReport); client.reportWorkerMessage(Collections.singletonList(msg)); @@ -268,7 +286,13 @@ public void testReportWorkerMessage_perWorkerMetrics() throws Exception { SendWorkerMessagesResponse workerMessage = new SendWorkerMessagesResponse(); workerMessage.setFactory(Transport.getJsonFactory()); response.setContent(workerMessage.toPrettyString()); - when(request.execute()).thenReturn(response); + + MockLowLevelHttpRequest request = new MockLowLevelHttpRequest().setResponse(response); + MockHttpTransport transport = + new MockHttpTransport.Builder().setLowLevelHttpRequest(request).build(); + DataflowWorkerHarnessOptions pipelineOptions = createPipelineOptionsWithTransport(transport); + WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG); + PerStepNamespaceMetrics stepNamespaceMetrics = new PerStepNamespaceMetrics() .setOriginalStep("s1") @@ -279,7 +303,6 @@ public void testReportWorkerMessage_perWorkerMetrics() throws Exception { new PerWorkerMetrics() .setPerStepNamespaceMetrics(Collections.singletonList(stepNamespaceMetrics)); - WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG); WorkerMessage perWorkerMetricsMsg = client.createWorkerMessageFromPerWorkerMetrics(perWorkerMetrics); client.reportWorkerMessage(Collections.singletonList(perWorkerMetricsMsg)); @@ -290,7 +313,7 @@ public void testReportWorkerMessage_perWorkerMetrics() throws Exception { assertEquals(ImmutableList.of(perWorkerMetricsMsg), actualRequest.getWorkerMessages()); } - private LowLevelHttpResponse generateMockResponse(WorkItem... workItems) throws Exception { + private MockLowLevelHttpResponse generateMockResponse(WorkItem... workItems) throws Exception { MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); response.setContentType(Json.MEDIA_TYPE); LeaseWorkItemResponse lease = new LeaseWorkItemResponse(); diff --git a/sdks/java/expansion-service/build.gradle b/sdks/java/expansion-service/build.gradle index 18df128b9386..6947e53354db 100644 --- a/sdks/java/expansion-service/build.gradle +++ b/sdks/java/expansion-service/build.gradle @@ -41,10 +41,8 @@ dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") implementation project(path: ":runners:core-construction-java") implementation project(path: ":runners:java-fn-execution") - implementation project(path: ":sdks:java:fn-execution") implementation project(path: ":sdks:java:harness") permitUnusedDeclared project(path: ":model:fn-execution") - permitUnusedDeclared project(path: ":sdks:java:fn-execution") implementation library.java.jackson_annotations implementation library.java.jackson_databind implementation library.java.jackson_dataformat_yaml diff --git a/sdks/java/fn-execution/build.gradle b/sdks/java/fn-execution/build.gradle deleted file mode 100644 index 01a073bb7ae7..000000000000 --- a/sdks/java/fn-execution/build.gradle +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -plugins { id 'org.apache.beam.module' } -applyJavaNature( - automaticModuleName: 'org.apache.beam.sdk.fn', -) - -description = "Apache Beam :: SDKs :: Java :: Fn Execution" -ext.summary = """Contains code shared across the Beam Java SDK Harness and Java Runners to execute using -the Beam Portability Framework.""" - -dependencies { - testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") - testImplementation library.java.junit - testImplementation library.java.mockito_core - testImplementation library.java.commons_lang3 - testRuntimeOnly library.java.slf4j_jdk14 -} diff --git a/sdks/java/harness/build.gradle b/sdks/java/harness/build.gradle index 52c20e62c498..d3b4267424ce 100644 --- a/sdks/java/harness/build.gradle +++ b/sdks/java/harness/build.gradle @@ -90,8 +90,7 @@ dependencies { permitUnusedDeclared project(path: ":sdks:java:transform-service:launcher") testImplementation library.java.junit testImplementation library.java.mockito_core - shadowTestRuntimeClasspath project(path: ":sdks:java:core", configuration: "shadowTest") - testImplementation project(path: ":sdks:java:fn-execution", configuration: "testRuntimeMigration") + shadowTest project(path: ":sdks:java:core", configuration: "shadowTest") shadowTestRuntimeClasspath library.java.slf4j_jdk14 permitUnusedDeclared library.java.avro } diff --git a/settings.gradle.kts b/settings.gradle.kts index b1b03b55add0..034df0fbda5f 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -203,7 +203,6 @@ include(":sdks:java:extensions:sql:udf") include(":sdks:java:extensions:sql:udf-test-provider") include(":sdks:java:extensions:timeseries") include(":sdks:java:extensions:zetasketch") -include(":sdks:java:fn-execution") include(":sdks:java:harness") include(":sdks:java:harness:jmh") include(":sdks:java:io:amazon-web-services")