diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index deee8876af6f..de9a30ad8189 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -26,16 +26,11 @@ description = "Apache Beam :: Runners :: Prism :: Java" ext.summary = "Support for executing a pipeline on Prism." dependencies { - implementation project(path: ":model:job-management", configuration: "shadow") - implementation project(path: ":model:pipeline", configuration: "shadow") implementation project(path: ":sdks:java:core", configuration: "shadow") - implementation project(path: ":sdks:java:harness", configuration: "shadow") - implementation project(":runners:java-fn-execution") implementation project(":runners:portability:java") implementation library.java.joda_time implementation library.java.slf4j_api - implementation library.java.vendored_grpc_1_60_1 implementation library.java.vendored_guava_32_1_2_jre compileOnly library.java.hamcrest diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismArtifactResolver.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismArtifactResolver.java deleted file mode 100644 index db56bc6047ca..000000000000 --- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismArtifactResolver.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.prism; - -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; - -import com.google.auto.value.AutoValue; -import java.util.Optional; -import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.construction.DefaultArtifactResolver; -import org.apache.beam.sdk.util.construction.PipelineTranslation; -import org.apache.beam.sdk.util.construction.SdkComponents; - -/** - * The {@link PrismArtifactResolver} converts a {@link Pipeline} to a {@link RunnerApi.Pipeline} via - * resolving {@link RunnerApi.ArtifactInformation}. - */ -@AutoValue -abstract class PrismArtifactResolver { - - /** - * Instantiates a {@link PrismArtifactResolver} from the {@param pipeline}, applying defaults to - * the remaining dependencies. - */ - static PrismArtifactResolver of(Pipeline pipeline) { - return PrismArtifactResolver.builder().setPipeline(pipeline).build(); - } - - static Builder builder() { - return new AutoValue_PrismArtifactResolver.Builder(); - } - - /** - * Converts the {@link #getPipeline()} using {@link PipelineTranslation#toProto} and {@link - * #getDelegate()}'s {@link - * org.apache.beam.sdk.util.construction.ArtifactResolver#resolveArtifacts}. - */ - RunnerApi.Pipeline resolvePipelineProto() { - RunnerApi.Pipeline result = PipelineTranslation.toProto(getPipeline(), getSdkComponents()); - return getDelegate().resolveArtifacts(result); - } - - /** - * {@link PrismArtifactResolver} delegates to {@link - * org.apache.beam.sdk.util.construction.ArtifactResolver} to transform {@link - * RunnerApi.ArtifactInformation}. Defaults to {@link DefaultArtifactResolver#INSTANCE} if not - * set. - */ - abstract org.apache.beam.sdk.util.construction.ArtifactResolver getDelegate(); - - /** The {@link Pipeline} from which {@link PrismArtifactResolver#resolvePipelineProto()}. */ - abstract Pipeline getPipeline(); - - /** - * SDK objects that will be represented by {@link - * org.apache.beam.model.pipeline.v1.RunnerApi.Components}. Instantiated via {@link - * SdkComponents#create(PipelineOptions)} by default, where {@link PipelineOptions} are acquired - * from {@link #getPipeline}'s {@link Pipeline#getOptions}. - */ - abstract SdkComponents getSdkComponents(); - - @AutoValue.Builder - abstract static class Builder { - - abstract Builder setDelegate( - org.apache.beam.sdk.util.construction.ArtifactResolver artifactResolver); - - abstract Optional getDelegate(); - - abstract Builder setSdkComponents(SdkComponents sdkComponents); - - abstract Optional getSdkComponents(); - - abstract Builder setPipeline(Pipeline pipeline); - - abstract Optional getPipeline(); - - abstract PrismArtifactResolver autoBuild(); - - final PrismArtifactResolver build() { - if (!getDelegate().isPresent()) { - setDelegate(DefaultArtifactResolver.INSTANCE); - } - - if (!getSdkComponents().isPresent()) { - checkState(getPipeline().isPresent()); - setSdkComponents(SdkComponents.create(getPipeline().get().getOptions())); - } - - return autoBuild(); - } - } -} diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismArtifactStager.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismArtifactStager.java deleted file mode 100644 index f1d99a213eea..000000000000 --- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismArtifactStager.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.prism; - -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; - -import com.google.auto.value.AutoValue; -import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc; -import org.apache.beam.model.jobmanagement.v1.JobApi; -import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; -import org.apache.beam.model.pipeline.v1.Endpoints; -import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService; -import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; -import org.apache.beam.sdk.fn.channel.ManagedChannelFactory; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Stages {@link org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline} artifacts of prepared jobs. - */ -@AutoValue -abstract class PrismArtifactStager implements AutoCloseable { - - private static final Logger LOG = LoggerFactory.getLogger(PrismArtifactStager.class); - - /** - * Instantiate a {@link PrismArtifactStager} via call to {@link #of(String, String)}, assigning - * {@link Builder#setStagingEndpoint} using {@param prepareJobResponse} {@link - * JobApi.PrepareJobResponse#getArtifactStagingEndpoint} and {@link - * JobApi.PrepareJobResponse#getStagingSessionToken}. - */ - static PrismArtifactStager of(JobApi.PrepareJobResponse prepareJobResponse) { - return of( - prepareJobResponse.getArtifactStagingEndpoint().getUrl(), - prepareJobResponse.getStagingSessionToken()); - } - - /** - * Instantiates a {@link PrismArtifactStager} from the {@param stagingEndpoint} URL and {@param - * stagingSessionToken} to instantiate the {@link #getRetrievalService}, {@link - * #getManagedChannel}, and {@link #getStagingServiceStub} defaults. See the referenced getters - * for more details. - */ - static PrismArtifactStager of(String stagingEndpoint, String stagingSessionToken) { - return PrismArtifactStager.builder() - .setStagingEndpoint(stagingEndpoint) - .setStagingSessionToken(stagingSessionToken) - .build(); - } - - static Builder builder() { - return new AutoValue_PrismArtifactStager.Builder(); - } - - /** - * Stage the {@link org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline} artifacts via {@link - * ArtifactStagingService#offer} supplying {@link #getRetrievalService}, {@link - * #getStagingServiceStub}, and {@link #getStagingSessionToken}. - */ - void stage() throws ExecutionException, InterruptedException { - LOG.info("staging artifacts at {}", getStagingEndpoint()); - ArtifactStagingService.offer( - getRetrievalService(), getStagingServiceStub(), getStagingSessionToken()); - } - - /** The URL of the {@link ArtifactStagingService}. */ - abstract String getStagingEndpoint(); - - /** - * Token associated with a staging session and acquired from a {@link - * JobServiceGrpc.JobServiceStub#prepare}'s {@link JobApi.PrepareJobResponse}. - */ - abstract String getStagingSessionToken(); - - /** - * The service that retrieves artifacts; defaults to instantiating from the default {@link - * ArtifactRetrievalService#ArtifactRetrievalService()} constructor. - */ - abstract ArtifactRetrievalService getRetrievalService(); - - /** - * Used to instantiate the {@link #getStagingServiceStub}. By default, instantiates using {@link - * ManagedChannelFactory#forDescriptor(Endpoints.ApiServiceDescriptor)}, where {@link - * Endpoints.ApiServiceDescriptor} is instantiated via {@link - * Endpoints.ApiServiceDescriptor.Builder#setUrl(String)} and the URL provided by {@link - * #getStagingEndpoint}. - */ - abstract ManagedChannel getManagedChannel(); - - /** - * Required by {@link ArtifactStagingService#offer}. By default, instantiates using {@link - * ArtifactStagingServiceGrpc#newStub} and {@link #getManagedChannel}. - */ - abstract ArtifactStagingServiceGrpc.ArtifactStagingServiceStub getStagingServiceStub(); - - @Override - public void close() { - LOG.info("shutting down {}", PrismArtifactStager.class); - getRetrievalService().close(); - getManagedChannel().shutdown(); - try { - getManagedChannel().awaitTermination(3000L, TimeUnit.MILLISECONDS); - } catch (InterruptedException ignored) { - } - } - - @AutoValue.Builder - abstract static class Builder { - - abstract Builder setStagingEndpoint(String stagingEndpoint); - - abstract Optional getStagingEndpoint(); - - abstract Builder setStagingSessionToken(String stagingSessionToken); - - abstract Builder setRetrievalService(ArtifactRetrievalService retrievalService); - - abstract Optional getRetrievalService(); - - abstract Builder setManagedChannel(ManagedChannel managedChannel); - - abstract Optional getManagedChannel(); - - abstract Builder setStagingServiceStub( - ArtifactStagingServiceGrpc.ArtifactStagingServiceStub stub); - - abstract Optional - getStagingServiceStub(); - - abstract PrismArtifactStager autoBuild(); - - final PrismArtifactStager build() { - - checkState(getStagingEndpoint().isPresent(), "missing staging endpoint"); - ManagedChannelFactory channelFactory = ManagedChannelFactory.createDefault(); - - if (!getManagedChannel().isPresent()) { - Endpoints.ApiServiceDescriptor descriptor = - Endpoints.ApiServiceDescriptor.newBuilder().setUrl(getStagingEndpoint().get()).build(); - setManagedChannel(channelFactory.forDescriptor(descriptor)); - } - - if (!getStagingServiceStub().isPresent()) { - setStagingServiceStub(ArtifactStagingServiceGrpc.newStub(getManagedChannel().get())); - } - - if (!getRetrievalService().isPresent()) { - setRetrievalService(new ArtifactRetrievalService()); - } - - return autoBuild(); - } - } -} diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismJobManager.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismJobManager.java deleted file mode 100644 index e461e92c4749..000000000000 --- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismJobManager.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.prism; - -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; - -import com.google.auto.value.AutoValue; -import java.io.Closeable; -import java.util.Optional; -import java.util.concurrent.TimeUnit; -import org.apache.beam.model.jobmanagement.v1.JobApi; -import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; -import org.apache.beam.model.pipeline.v1.Endpoints; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.fn.channel.ManagedChannelFactory; -import org.apache.beam.sdk.options.PortablePipelineOptions; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; -import org.joda.time.Duration; - -/** - * A wrapper for {@link JobServiceGrpc.JobServiceBlockingStub} that {@link #close}es when {@link - * StateListener#onStateChanged} is invoked with a {@link PipelineResult.State} that is {@link - * PipelineResult.State#isTerminal}. - */ -@AutoValue -abstract class PrismJobManager implements StateListener, Closeable { - - /** - * Instantiate a {@link PrismJobManager} with {@param options}, assigning {@link #getEndpoint} - * from {@link PortablePipelineOptions#getJobEndpoint} and {@link #getTimeout} from {@link - * PortablePipelineOptions#getJobServerTimeout}. Defaults the instantiations of {@link - * #getManagedChannel} and {@link #getBlockingStub}. See respective getters for more details. - */ - static PrismJobManager of(PortablePipelineOptions options) { - return builder() - .setEndpoint(options.getJobEndpoint()) - .setTimeout(Duration.standardSeconds(options.getJobServerTimeout())) - .build(); - } - - static Builder builder() { - return new AutoValue_PrismJobManager.Builder(); - } - - /** - * Executes {@link #getBlockingStub()}'s {@link JobServiceGrpc.JobServiceBlockingStub#prepare} - * method. - */ - JobApi.PrepareJobResponse prepare(JobApi.PrepareJobRequest request) { - return getBlockingStub().prepare(request); - } - - /** - * Executes {@link #getBlockingStub()}'s {@link JobServiceGrpc.JobServiceBlockingStub#run} method. - */ - JobApi.RunJobResponse run(JobApi.RunJobRequest request) { - return getBlockingStub().run(request); - } - - /** The {@link JobServiceGrpc} endpoint. */ - abstract String getEndpoint(); - - /** The {@link JobServiceGrpc} timeout. */ - abstract Duration getTimeout(); - - /** The {@link #getBlockingStub}'s channel. Defaulted from the {@link #getEndpoint()}. */ - abstract ManagedChannel getManagedChannel(); - - /** The wrapped service defaulted using the {@link #getManagedChannel}. */ - abstract JobServiceGrpc.JobServiceBlockingStub getBlockingStub(); - - /** Shuts down {@link #getManagedChannel}, if not {@link #isShutdown}. */ - @Override - public void close() { - if (isShutdown()) { - return; - } - getManagedChannel().shutdown(); - try { - getManagedChannel().awaitTermination(3000L, TimeUnit.MILLISECONDS); - } catch (InterruptedException ignored) { - } - } - - /** Queries whether {@link #getManagedChannel} {@link ManagedChannel#isShutdown}. */ - boolean isShutdown() { - return getManagedChannel().isShutdown(); - } - - /** - * Override of {@link StateListener#onStateChanged}. Invokes {@link #close} when {@link - * PipelineResult.State} {@link PipelineResult.State#isTerminal}. - */ - @Override - public void onStateChanged(PipelineResult.State state) { - if (state.isTerminal()) { - close(); - } - } - - @AutoValue.Builder - abstract static class Builder { - - abstract Builder setEndpoint(String endpoint); - - abstract Optional getEndpoint(); - - abstract Builder setTimeout(Duration timeout); - - abstract Optional getTimeout(); - - abstract Builder setManagedChannel(ManagedChannel managedChannel); - - abstract Optional getManagedChannel(); - - abstract Builder setBlockingStub(JobServiceGrpc.JobServiceBlockingStub blockingStub); - - abstract Optional getBlockingStub(); - - abstract PrismJobManager autoBuild(); - - final PrismJobManager build() { - - checkState(getEndpoint().isPresent(), "endpoint is not set"); - checkState(getTimeout().isPresent(), "timeout is not set"); - - if (!getManagedChannel().isPresent()) { - ManagedChannelFactory channelFactory = ManagedChannelFactory.createDefault(); - - setManagedChannel( - channelFactory.forDescriptor( - Endpoints.ApiServiceDescriptor.newBuilder().setUrl(getEndpoint().get()).build())); - } - - if (!getBlockingStub().isPresent()) { - setBlockingStub( - JobServiceGrpc.newBlockingStub(getManagedChannel().get()) - .withDeadlineAfter(getTimeout().get().getMillis(), TimeUnit.MILLISECONDS) - .withWaitForReady()); - } - - return autoBuild(); - } - } -} diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/StateListener.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/StateListener.java deleted file mode 100644 index 89f537e4f812..000000000000 --- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/StateListener.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.prism; - -import org.apache.beam.sdk.PipelineResult; - -/** Listens for {@link PipelineResult.State} changes reported by the {@link StateWatcher}. */ -interface StateListener { - - /** Callback invoked when {@link StateWatcher} discovers a {@link PipelineResult.State} change. */ - void onStateChanged(PipelineResult.State state); -} diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/StateWatcher.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/StateWatcher.java deleted file mode 100644 index fe9eb84a72b5..000000000000 --- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/StateWatcher.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.prism; - -import com.google.auto.value.AutoValue; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.TimeUnit; -import org.apache.beam.model.jobmanagement.v1.JobApi; -import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ChannelCredentials; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.InsecureChannelCredentials; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.netty.NettyChannelBuilder; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; - -/** - * {@link StateWatcher} {@link #watch}es for and reports {@link PipelineResult.State} changes to - * {@link StateListener}s. - */ -@AutoValue -abstract class StateWatcher implements AutoCloseable { - - private Optional latestState = Optional.empty(); - - /** - * Instantiates a {@link StateWatcher} with {@link InsecureChannelCredentials}. {@link - * StateWatcher} will report to each {@link StateListener} of {@param listeners} of any changed - * {@link PipelineResult.State}. - */ - static StateWatcher insecure(String endpoint, StateListener... listeners) { - return StateWatcher.builder() - .setEndpoint(HostAndPort.fromString(endpoint)) - .setCredentials(InsecureChannelCredentials.create()) - .setListeners(Arrays.asList(listeners)) - .build(); - } - - /** - * Watch for a Job's {@link PipelineResult.State} change. A {@link - * org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest} identifies a Job to watch via - * its {@link JobApi.GetJobStateRequest#getJobId()}. The method is blocking until the {@link - * JobApi.JobStateEvent} {@link StreamObserver#onCompleted()}. - */ - void watch(String jobId) { - JobApi.GetJobStateRequest request = - JobApi.GetJobStateRequest.newBuilder().setJobId(jobId).build(); - Iterator iterator = getJobServiceBlockingStub().getStateStream(request); - while (iterator.hasNext()) { - JobApi.JobStateEvent event = iterator.next(); - PipelineResult.State state = PipelineResult.State.valueOf(event.getState().name()); - publish(state); - } - } - - private void publish(PipelineResult.State state) { - if (latestState.isPresent() && latestState.get().equals(state)) { - return; - } - latestState = Optional.of(state); - for (StateListener listener : getListeners()) { - listener.onStateChanged(state); - } - } - - static Builder builder() { - return new AutoValue_StateWatcher.Builder(); - } - - abstract HostAndPort getEndpoint(); - - abstract ChannelCredentials getCredentials(); - - abstract List getListeners(); - - abstract ManagedChannel getManagedChannel(); - - abstract JobServiceGrpc.JobServiceBlockingStub getJobServiceBlockingStub(); - - @Override - public void close() { - getManagedChannel().shutdown(); - try { - getManagedChannel().awaitTermination(3000L, TimeUnit.MILLISECONDS); - } catch (InterruptedException ignored) { - } - } - - @AutoValue.Builder - abstract static class Builder { - - abstract Builder setEndpoint(HostAndPort endpoint); - - abstract Optional getEndpoint(); - - abstract Builder setCredentials(ChannelCredentials credentials); - - abstract Optional getCredentials(); - - abstract Builder setListeners(List listeners); - - abstract Builder setManagedChannel(ManagedChannel managedChannel); - - abstract Builder setJobServiceBlockingStub( - JobServiceGrpc.JobServiceBlockingStub jobServiceBlockingStub); - - abstract StateWatcher autoBuild(); - - final StateWatcher build() { - if (!getEndpoint().isPresent()) { - throw new IllegalStateException("missing endpoint"); - } - if (!getCredentials().isPresent()) { - throw new IllegalStateException("missing credentials"); - } - HostAndPort endpoint = getEndpoint().get(); - ManagedChannel channel = - NettyChannelBuilder.forAddress( - endpoint.getHost(), endpoint.getPort(), getCredentials().get()) - .build(); - setManagedChannel(channel); - setJobServiceBlockingStub(JobServiceGrpc.newBlockingStub(channel)); - - return autoBuild(); - } - } -} diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/WorkerService.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/WorkerService.java deleted file mode 100644 index 289ffac64f8a..000000000000 --- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/WorkerService.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.prism; - -import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; - -import org.apache.beam.fn.harness.ExternalWorkerService; -import org.apache.beam.model.pipeline.v1.Endpoints; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.fn.server.GrpcFnServer; -import org.apache.beam.sdk.options.PortablePipelineOptions; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Server; -import org.checkerframework.checker.nullness.qual.MonotonicNonNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * An {@link ExternalWorkerService} {@link GrpcFnServer} encapsulation that {@link #stop}s when - * {@link StateListener#onStateChanged} is invoked with a {@link PipelineResult.State} that is - * {@link PipelineResult.State#isTerminal}. - */ -class WorkerService implements StateListener { - - private static final Logger LOG = LoggerFactory.getLogger(WorkerService.class); - - private final ExternalWorkerService worker; - private @MonotonicNonNull GrpcFnServer server; - - WorkerService(PortablePipelineOptions options) { - this.worker = new ExternalWorkerService(options); - } - - /** Start the {@link ExternalWorkerService}. */ - void start() throws Exception { - if (server != null && !server.getServer().isShutdown()) { - return; - } - - server = worker.start(); - LOG.info("Starting worker service at {}", getApiServiceDescriptorUrl()); - } - - /** - * Queries whether the {@link ExternalWorkerService} {@link GrpcFnServer}'s {@link Server} is - * running. - */ - boolean isRunning() { - if (server == null) { - return false; - } - return !server.getServer().isShutdown(); - } - - /** - * Queries the {@link Endpoints.ApiServiceDescriptor#getUrl} of the {@link ExternalWorkerService} - * {@link GrpcFnServer}'s {@link Server}. Throws an exception if the {@link WorkerService} has not - * {@link WorkerService#start}ed. - */ - String getApiServiceDescriptorUrl() { - return checkStateNotNull(server, "worker service not started") - .getApiServiceDescriptor() - .getUrl(); - } - - /** - * Updates {@link PortablePipelineOptions#getDefaultEnvironmentConfig} with {@link - * #getApiServiceDescriptorUrl}. Throws an exception if the {@link WorkerService} has not {@link - * WorkerService#start}ed. - */ - PortablePipelineOptions updateDefaultEnvironmentConfig(PortablePipelineOptions options) { - options.setDefaultEnvironmentConfig(getApiServiceDescriptorUrl()); - return options; - } - - /** - * Overrides {@link StateListener#onStateChanged}, invoking {@link #stop} when {@link - * PipelineResult.State#isTerminal}. - */ - @Override - public void onStateChanged(PipelineResult.State state) { - if (state.isTerminal()) { - stop(); - } - } - - /** - * Stops the {@link ExternalWorkerService} {@link GrpcFnServer}'s {@link Server}. If not {@link - * WorkerService#isRunning()}, then calling stop is a noop. - */ - void stop() { - if (server == null || server.getServer().isShutdown()) { - return; - } - LOG.info("Stopping worker service at {}", getApiServiceDescriptorUrl()); - try { - server.close(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } -} diff --git a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismArtifactResolverTest.java b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismArtifactResolverTest.java deleted file mode 100644 index ef4646f02347..000000000000 --- a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismArtifactResolverTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.prism; - -import static com.google.common.truth.Truth.assertThat; - -import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.Impulse; -import org.apache.beam.sdk.util.construction.BeamUrns; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link PrismArtifactResolver}. */ -@RunWith(JUnit4.class) -public class PrismArtifactResolverTest { - @Test - public void resolvesPipeline() { - Pipeline pipeline = Pipeline.create(); - pipeline.apply(Impulse.create()); - PrismArtifactResolver underTest = PrismArtifactResolver.of(pipeline); - RunnerApi.Pipeline pipelineProto = underTest.resolvePipelineProto(); - RunnerApi.Components components = pipelineProto.getComponents(); - assertThat(components.getTransformsMap()).containsKey("Impulse"); - assertThat(components.getCodersMap()).containsKey("ByteArrayCoder"); - assertThat(components.getEnvironmentsMap()) - .containsKey(BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER)); - } -} diff --git a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismArtifactStagerTest.java b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismArtifactStagerTest.java deleted file mode 100644 index d3ac8a72eafb..000000000000 --- a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismArtifactStagerTest.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.prism; - -import static com.google.common.truth.Truth.assertThat; -import static org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.EMBEDDED_ARTIFACT_URN; -import static org.junit.Assert.assertThrows; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService; -import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService; -import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessChannelBuilder; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessServerBuilder; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.testing.GrpcCleanupRule; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; -import org.apache.commons.io.output.ByteArrayOutputStream; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link PrismArtifactStager}. */ -@RunWith(JUnit4.class) -public class PrismArtifactStagerTest { - - @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); - - final ArtifactStagingService stagingService = - new ArtifactStagingService(new TestDestinationProvider()); - - @Test - public void givenValidArtifacts_stages() - throws IOException, ExecutionException, InterruptedException { - PrismArtifactStager underTest = prismArtifactStager(validArtifacts()); - assertThat(underTest.getManagedChannel().isShutdown()).isFalse(); - underTest.stage(); - assertThat(stagingService.getStagedArtifacts(underTest.getStagingSessionToken())).isNotEmpty(); - underTest.close(); - assertThat(underTest.getManagedChannel().isShutdown()).isTrue(); - } - - @Test - public void givenErrors_performsGracefulCleanup() throws IOException { - PrismArtifactStager underTest = prismArtifactStager(invalidArtifacts()); - assertThat(underTest.getManagedChannel().isShutdown()).isFalse(); - ExecutionException error = assertThrows(ExecutionException.class, underTest::stage); - assertThat(error.getMessage()).contains("Unexpected artifact type: invalid-type-urn"); - assertThat(underTest.getManagedChannel().isShutdown()).isFalse(); - underTest.close(); - assertThat(underTest.getManagedChannel().isShutdown()).isTrue(); - } - - private PrismArtifactStager prismArtifactStager( - Map> artifacts) throws IOException { - String serverName = InProcessServerBuilder.generateName(); - ArtifactRetrievalService retrievalService = new ArtifactRetrievalService(); - String stagingToken = "staging-token"; - stagingService.registerJob(stagingToken, artifacts); - - grpcCleanup.register( - InProcessServerBuilder.forName(serverName) - .directExecutor() - .addService(stagingService) - .addService(retrievalService) - .build() - .start()); - - ManagedChannel channel = - grpcCleanup.register(InProcessChannelBuilder.forName(serverName).build()); - - return PrismArtifactStager.builder() - .setStagingEndpoint("ignore") - .setStagingSessionToken(stagingToken) - .setManagedChannel(channel) - .build(); - } - - private Map> validArtifacts() { - return ImmutableMap.of( - "env1", - Collections.singletonList( - RunnerApi.ArtifactInformation.newBuilder() - .setTypeUrn(EMBEDDED_ARTIFACT_URN) - .setTypePayload( - RunnerApi.EmbeddedFilePayload.newBuilder() - .setData(ByteString.copyFromUtf8("type-payload")) - .build() - .toByteString()) - .setRoleUrn("role-urn") - .build())); - } - - private Map> invalidArtifacts() { - return ImmutableMap.of( - "env1", - Collections.singletonList( - RunnerApi.ArtifactInformation.newBuilder() - .setTypeUrn("invalid-type-urn") - .setTypePayload( - RunnerApi.EmbeddedFilePayload.newBuilder() - .setData(ByteString.copyFromUtf8("type-payload")) - .build() - .toByteString()) - .setRoleUrn("role-urn") - .build())); - } - - private static class TestDestinationProvider - implements ArtifactStagingService.ArtifactDestinationProvider { - - @Override - public ArtifactStagingService.ArtifactDestination getDestination( - String stagingToken, String name) throws IOException { - return ArtifactStagingService.ArtifactDestination.create( - EMBEDDED_ARTIFACT_URN, ByteString.EMPTY, new ByteArrayOutputStream()); - } - - @Override - public void removeStagedArtifacts(String stagingToken) throws IOException {} - } -} diff --git a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismJobManagerTest.java b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismJobManagerTest.java deleted file mode 100644 index 1e38e4f8d12e..000000000000 --- a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismJobManagerTest.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.prism; - -import static com.google.common.truth.Truth.assertThat; -import static org.junit.Assert.assertThrows; - -import java.io.IOException; -import java.util.Optional; -import org.apache.beam.model.jobmanagement.v1.JobApi; -import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; -import org.apache.beam.model.pipeline.v1.Endpoints; -import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.transforms.Impulse; -import org.apache.beam.sdk.util.construction.PipelineTranslation; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessChannelBuilder; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessServerBuilder; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.testing.GrpcCleanupRule; -import org.joda.time.Duration; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link PrismJobManager}. */ -@RunWith(JUnit4.class) -public class PrismJobManagerTest { - @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); - - @Rule public TestName testName = new TestName(); - - @Test - public void givenPrepareError_forwardsException_canGracefulShutdown() { - TestJobService service = - new TestJobService().withErrorResponse(new RuntimeException(testName.getMethodName())); - PrismJobManager underTest = prismJobManager(service); - assertThat(underTest.isShutdown()).isFalse(); - assertThrows( - RuntimeException.class, - () -> - underTest.prepare( - JobApi.PrepareJobRequest.newBuilder().setPipeline(pipelineOf()).build())); - assertThat(underTest.isShutdown()).isFalse(); - underTest.close(); - assertThat(underTest.isShutdown()).isTrue(); - } - - @Test - public void givenPrepareSuccess_forwardsResponse_canGracefulShutdown() { - TestJobService service = - new TestJobService() - .withPrepareJobResponse( - JobApi.PrepareJobResponse.newBuilder() - .setStagingSessionToken("token") - .setPreparationId("preparationId") - .setArtifactStagingEndpoint( - Endpoints.ApiServiceDescriptor.newBuilder() - .setUrl("localhost:1234") - .build()) - .build()); - PrismJobManager underTest = prismJobManager(service); - assertThat(underTest.isShutdown()).isFalse(); - JobApi.PrepareJobResponse response = - underTest.prepare(JobApi.PrepareJobRequest.newBuilder().setPipeline(pipelineOf()).build()); - assertThat(underTest.isShutdown()).isFalse(); - assertThat(response.getStagingSessionToken()).isEqualTo("token"); - assertThat(response.getPreparationId()).isEqualTo("preparationId"); - underTest.close(); - assertThat(underTest.isShutdown()).isTrue(); - } - - @Test - public void givenRunError_forwardsException_canGracefulShutdown() { - TestJobService service = - new TestJobService().withErrorResponse(new RuntimeException(testName.getMethodName())); - PrismJobManager underTest = prismJobManager(service); - assertThat(underTest.isShutdown()).isFalse(); - assertThrows( - RuntimeException.class, - () -> - underTest.run(JobApi.RunJobRequest.newBuilder().setPreparationId("prepareId").build())); - assertThat(underTest.isShutdown()).isFalse(); - underTest.close(); - assertThat(underTest.isShutdown()).isTrue(); - } - - @Test - public void givenRunSuccess_forwardsResponse_canGracefulShutdown() { - TestJobService service = - new TestJobService() - .withRunJobResponse(JobApi.RunJobResponse.newBuilder().setJobId("jobId").build()); - PrismJobManager underTest = prismJobManager(service); - assertThat(underTest.isShutdown()).isFalse(); - JobApi.RunJobResponse runJobResponse = - underTest.run(JobApi.RunJobRequest.newBuilder().setPreparationId("preparationId").build()); - assertThat(underTest.isShutdown()).isFalse(); - assertThat(runJobResponse.getJobId()).isEqualTo("jobId"); - underTest.close(); - assertThat(underTest.isShutdown()).isTrue(); - } - - @Test - public void givenTerminalState_closes() { - PrismJobManager underTest = prismJobManager(new TestJobService()); - assertThat(underTest.isShutdown()).isFalse(); - underTest.onStateChanged(PipelineResult.State.RUNNING); - assertThat(underTest.isShutdown()).isFalse(); - underTest.onStateChanged(PipelineResult.State.RUNNING); - assertThat(underTest.isShutdown()).isFalse(); - underTest.onStateChanged(PipelineResult.State.CANCELLED); - assertThat(underTest.isShutdown()).isTrue(); - - underTest.close(); - } - - private PrismJobManager prismJobManager(TestJobService service) { - String serverName = InProcessServerBuilder.generateName(); - try { - grpcCleanup.register( - InProcessServerBuilder.forName(serverName) - .directExecutor() - .addService(service) - .build() - .start()); - } catch (IOException e) { - throw new RuntimeException(e); - } - - ManagedChannel channel = - grpcCleanup.register(InProcessChannelBuilder.forName(serverName).build()); - - return PrismJobManager.builder() - .setTimeout(Duration.millis(3000L)) - .setEndpoint("ignore") - .setManagedChannel(channel) - .build(); - } - - private static class TestJobService extends JobServiceGrpc.JobServiceImplBase { - - private Optional prepareJobResponse = Optional.empty(); - private Optional runJobResponse = Optional.empty(); - private Optional error = Optional.empty(); - - TestJobService withPrepareJobResponse(JobApi.PrepareJobResponse prepareJobResponse) { - this.prepareJobResponse = Optional.of(prepareJobResponse); - return this; - } - - TestJobService withRunJobResponse(JobApi.RunJobResponse runJobResponse) { - this.runJobResponse = Optional.of(runJobResponse); - return this; - } - - TestJobService withErrorResponse(RuntimeException error) { - this.error = Optional.of(error); - return this; - } - - @Override - public void prepare( - JobApi.PrepareJobRequest request, - StreamObserver responseObserver) { - if (prepareJobResponse.isPresent()) { - responseObserver.onNext(prepareJobResponse.get()); - responseObserver.onCompleted(); - } - if (error.isPresent()) { - responseObserver.onError(error.get()); - } - } - - @Override - public void run( - JobApi.RunJobRequest request, StreamObserver responseObserver) { - if (runJobResponse.isPresent()) { - responseObserver.onNext(runJobResponse.get()); - responseObserver.onCompleted(); - } - if (error.isPresent()) { - responseObserver.onError(error.get()); - } - } - } - - private static RunnerApi.Pipeline pipelineOf() { - Pipeline pipeline = Pipeline.create(); - pipeline.apply(Impulse.create()); - return PipelineTranslation.toProto(pipeline); - } -} diff --git a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/StateWatcherTest.java b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/StateWatcherTest.java deleted file mode 100644 index cfc420046206..000000000000 --- a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/StateWatcherTest.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.prism; - -import static com.google.common.truth.Truth.assertThat; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.model.jobmanagement.v1.JobApi; -import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Grpc; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.InsecureServerCredentials; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Server; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -@RunWith(JUnit4.class) -public class StateWatcherTest { - - @Test - public void givenSingleListener_watches() { - Server server = serverOf(PipelineResult.State.RUNNING, PipelineResult.State.DONE); - TestStateListener listener = new TestStateListener(); - try (StateWatcher underTest = StateWatcher.insecure("0.0.0.0:" + server.getPort(), listener)) { - underTest.watch("job-001"); - assertThat(listener.states) - .containsExactly(PipelineResult.State.RUNNING, PipelineResult.State.DONE); - shutdown(server); - } - } - - @Test - public void givenMultipleListeners_watches() { - Server server = serverOf(PipelineResult.State.RUNNING, PipelineResult.State.DONE); - TestStateListener listenerA = new TestStateListener(); - TestStateListener listenerB = new TestStateListener(); - try (StateWatcher underTest = - StateWatcher.insecure("0.0.0.0:" + server.getPort(), listenerA, listenerB)) { - underTest.watch("job-001"); - assertThat(listenerA.states) - .containsExactly(PipelineResult.State.RUNNING, PipelineResult.State.DONE); - assertThat(listenerB.states) - .containsExactly(PipelineResult.State.RUNNING, PipelineResult.State.DONE); - shutdown(server); - } - } - - @Test - public void publishesOnlyChangedState() { - Server server = - serverOf( - PipelineResult.State.RUNNING, - PipelineResult.State.RUNNING, - PipelineResult.State.RUNNING, - PipelineResult.State.RUNNING, - PipelineResult.State.RUNNING, - PipelineResult.State.RUNNING, - PipelineResult.State.RUNNING, - PipelineResult.State.DONE); - TestStateListener listener = new TestStateListener(); - try (StateWatcher underTest = StateWatcher.insecure("0.0.0.0:" + server.getPort(), listener)) { - underTest.watch("job-001"); - assertThat(listener.states) - .containsExactly(PipelineResult.State.RUNNING, PipelineResult.State.DONE); - shutdown(server); - } - } - - private static class TestStateListener implements StateListener { - private final List states = new ArrayList<>(); - - @Override - public void onStateChanged(PipelineResult.State state) { - states.add(state); - } - } - - private static class TestJobServiceStateStream extends JobServiceGrpc.JobServiceImplBase { - private final List states; - - TestJobServiceStateStream(PipelineResult.State... states) { - this.states = Arrays.asList(states); - } - - @Override - public void getStateStream( - JobApi.GetJobStateRequest request, StreamObserver responseObserver) { - for (PipelineResult.State state : states) { - responseObserver.onNext( - JobApi.JobStateEvent.newBuilder() - .setState(JobApi.JobState.Enum.valueOf(state.name())) - .build()); - } - responseObserver.onCompleted(); - } - } - - private static Server serverOf(PipelineResult.State... states) { - try { - return Grpc.newServerBuilderForPort(0, InsecureServerCredentials.create()) - .addService(new TestJobServiceStateStream(states)) - .build() - .start(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private static void shutdown(Server server) { - server.shutdownNow(); - try { - server.awaitTermination(); - } catch (InterruptedException ignored) { - } - } -} diff --git a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/WorkerServiceTest.java b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/WorkerServiceTest.java deleted file mode 100644 index 7fc05d7747cd..000000000000 --- a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/WorkerServiceTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.prism; - -import static com.google.common.truth.Truth.assertThat; -import static org.junit.Assert.assertThrows; - -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.PortablePipelineOptions; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link WorkerService}. */ -@RunWith(JUnit4.class) -public class WorkerServiceTest { - @Test - public void testStartStop() throws Exception { - PortablePipelineOptions options = - PipelineOptionsFactory.create().as(PortablePipelineOptions.class); - WorkerService underTest = new WorkerService(options); - underTest.start(); - assertThat(underTest.isRunning()).isTrue(); - assertThat(underTest.getApiServiceDescriptorUrl()).matches("localhost:\\d+"); - underTest.stop(); - assertThat(underTest.isRunning()).isFalse(); - } - - @Test - public void givenStarted_updateDefaultEnvironmentConfig() throws Exception { - PortablePipelineOptions options = - PipelineOptionsFactory.create().as(PortablePipelineOptions.class); - assertThat(options.getDefaultEnvironmentConfig()).isNull(); - WorkerService underTest = new WorkerService(options); - underTest.start(); - options = underTest.updateDefaultEnvironmentConfig(options); - assertThat(options.getDefaultEnvironmentConfig()) - .isEqualTo(underTest.getApiServiceDescriptorUrl()); - underTest.stop(); - } - - @Test - public void givenNotStarted_updateDefaultEnvironmentConfig_throws() { - PortablePipelineOptions options = - PipelineOptionsFactory.create().as(PortablePipelineOptions.class); - WorkerService underTest = new WorkerService(options); - assertThrows( - IllegalStateException.class, () -> underTest.updateDefaultEnvironmentConfig(options)); - } - - @Test - public void whenStateIsTerminal_thenStop() throws Exception { - PortablePipelineOptions options = - PipelineOptionsFactory.create().as(PortablePipelineOptions.class); - WorkerService underTest = new WorkerService(options); - assertThat(underTest.isRunning()).isFalse(); - underTest.start(); - assertThat(underTest.isRunning()).isTrue(); - - underTest.onStateChanged(PipelineResult.State.RUNNING); - assertThat(underTest.isRunning()).isTrue(); - - underTest.onStateChanged(PipelineResult.State.RUNNING); - assertThat(underTest.isRunning()).isTrue(); - - underTest.onStateChanged(PipelineResult.State.CANCELLED); - assertThat(underTest.isRunning()).isFalse(); - } -}