Skip to content

Commit

Permalink
Stage PrismRunner implementation and dependencies (#31794)
Browse files Browse the repository at this point in the history
* Stage PrismRunner implementation and dependencies

* Fix missing license on package-info

* Update runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineOptions.java

Co-authored-by: Ahmed Abualsaud <[email protected]>

---------

Co-authored-by: Ahmed Abualsaud <[email protected]>
  • Loading branch information
damondouglas and ahmedabu98 authored Jul 8, 2024
1 parent ac423af commit 631d40d
Show file tree
Hide file tree
Showing 7 changed files with 280 additions and 0 deletions.
1 change: 1 addition & 0 deletions runners/prism/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def modDir = project.rootDir.toPath().resolve("sdks")

// prismDir is the directory containing the prism executable.
def prismDir = modDir.resolve("go/cmd/prism")
ext.set('buildTarget', buildTarget)

// Overrides the gradle build task to build the prism executable.
def buildTask = tasks.named("build") {
Expand Down
42 changes: 42 additions & 0 deletions runners/prism/java/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins { id 'org.apache.beam.module' }

applyJavaNature(
automaticModuleName: 'org.apache.beam.runners.prism',
)

description = "Apache Beam :: Runners :: Prism :: Java"
ext.summary = "Support for executing a pipeline on Prism."

dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":runners:portability:java")

implementation library.java.slf4j_api
implementation library.java.vendored_guava_32_1_2_jre

testImplementation library.java.junit
testImplementation library.java.truth
}

tasks.test {
var prismBuildTask = dependsOn(':runners:prism:build')
systemProperty 'prism.buildTarget', prismBuildTask.project.property('buildTarget').toString()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.prism;

import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PortablePipelineOptions;

/**
* {@link org.apache.beam.sdk.options.PipelineOptions} for running a {@link
* org.apache.beam.sdk.Pipeline} on the {@link PrismRunner}.
*/
public interface PrismPipelineOptions extends PortablePipelineOptions {
@Description(
"Path or URL to a prism binary, or zipped binary for the current "
+ "platform (Operating System and Architecture). May also be an Apache "
+ "Beam Github Release page URL, with a matching --prismVersionOverride "
+ "set. This option overrides all others for finding a prism binary.")
String getPrismLocation();

void setPrismLocation(String prismLocation);

@Description(
"Override the SDK's version for deriving the Github Release URLs for "
+ "downloading a zipped prism binary, for the current platform. If "
+ "set to a Github Release page URL, then it will use that release page as a base when constructing the download URL.")
String getPrismVersionOverride();

void setPrismVersionOverride(String prismVersionOverride);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.prism;

import org.apache.beam.runners.portability.PortableRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.util.construction.Environments;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A {@link PipelineRunner} executed on Prism. Downloads, prepares, and executes the Prism service
* on behalf of the developer when {@link PipelineRunner#run}ning the pipeline. If users want to
* submit to an already running Prism service, use the {@link PortableRunner} with the {@link
* PortablePipelineOptions#getJobEndpoint()} option instead. Prism is a {@link
* org.apache.beam.runners.portability.PortableRunner} maintained at <a
* href="https://github.com/apache/beam/tree/master/sdks/go/cmd/prism">sdks/go/cmd/prism</a>.
*/
// TODO(https://github.com/apache/beam/issues/31793): add public modifier after finalizing
// PrismRunner. Depends on: https://github.com/apache/beam/issues/31402 and
// https://github.com/apache/beam/issues/31792.
class PrismRunner extends PipelineRunner<PipelineResult> {

private static final Logger LOG = LoggerFactory.getLogger(PrismRunner.class);

private static final String DEFAULT_PRISM_ENDPOINT = "localhost:8073";

private final PortableRunner internal;
private final PrismPipelineOptions prismPipelineOptions;

private PrismRunner(PortableRunner internal, PrismPipelineOptions prismPipelineOptions) {
this.internal = internal;
this.prismPipelineOptions = prismPipelineOptions;
}

/**
* Invoked from {@link Pipeline#run} where {@link PrismRunner} instantiates using {@link
* PrismPipelineOptions} configuration details.
*/
public static PrismRunner fromOptions(PipelineOptions options) {
PrismPipelineOptions prismPipelineOptions = options.as(PrismPipelineOptions.class);
assignDefaultsIfNeeded(prismPipelineOptions);
PortableRunner internal = PortableRunner.fromOptions(options);
return new PrismRunner(internal, prismPipelineOptions);
}

@Override
public PipelineResult run(Pipeline pipeline) {
LOG.info(
"running Pipeline using {}: defaultEnvironmentType: {}, jobEndpoint: {}",
PortableRunner.class.getName(),
prismPipelineOptions.getDefaultEnvironmentType(),
prismPipelineOptions.getJobEndpoint());

return internal.run(pipeline);
}

private static void assignDefaultsIfNeeded(PrismPipelineOptions prismPipelineOptions) {
if (Strings.isNullOrEmpty(prismPipelineOptions.getDefaultEnvironmentType())) {
prismPipelineOptions.setDefaultEnvironmentType(Environments.ENVIRONMENT_LOOPBACK);
}
if (Strings.isNullOrEmpty(prismPipelineOptions.getJobEndpoint())) {
prismPipelineOptions.setJobEndpoint(DEFAULT_PRISM_ENDPOINT);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/** Support for executing a pipeline on Prism. */
package org.apache.beam.runners.prism;
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.prism;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assume.assumeTrue;

import java.io.IOException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PeriodicImpulse;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link PrismRunner}. */

// TODO(https://github.com/apache/beam/issues/31793): Remove @Ignore after finalizing PrismRunner.
// Depends on: https://github.com/apache/beam/issues/31402 and
// https://github.com/apache/beam/issues/31792.
@Ignore
@RunWith(JUnit4.class)
public class PrismRunnerTest {
// See build.gradle for test task configuration.
private static final String PRISM_BUILD_TARGET_PROPERTY_NAME = "prism.buildTarget";

@Test
public void givenBoundedSource_runsUntilDone() {
Pipeline pipeline = Pipeline.create(options());
pipeline.apply(Create.of(1, 2, 3));
PipelineResult.State state = pipeline.run().waitUntilFinish();
assertThat(state).isEqualTo(PipelineResult.State.DONE);
}

@Test
public void givenUnboundedSource_runsUntilCancel() throws IOException {
Pipeline pipeline = Pipeline.create(options());
pipeline.apply(PeriodicImpulse.create());
PipelineResult result = pipeline.run();
assertThat(result.getState()).isEqualTo(PipelineResult.State.RUNNING);
PipelineResult.State state = result.cancel();
assertThat(state).isEqualTo(PipelineResult.State.CANCELLED);
}

private static PrismPipelineOptions options() {
PrismPipelineOptions opts = PipelineOptionsFactory.create().as(PrismPipelineOptions.class);

opts.setRunner(PrismRunner.class);
opts.setPrismLocation(getLocalPrismBuildOrIgnoreTest());

return opts;
}

/**
* Drives ignoring of tests via checking {@link org.junit.Assume#assumeTrue} that the {@link
* System#getProperty} for {@link #PRISM_BUILD_TARGET_PROPERTY_NAME} is not null or empty.
*/
static String getLocalPrismBuildOrIgnoreTest() {
String command = System.getProperty(PRISM_BUILD_TARGET_PROPERTY_NAME);
assumeTrue(
"System property: "
+ PRISM_BUILD_TARGET_PROPERTY_NAME
+ " is not set; see build.gradle for test task configuration",
!Strings.isNullOrEmpty(command));
return command;
}
}
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ include(":runners:jet")
include(":runners:local-java")
include(":runners:portability:java")
include(":runners:prism")
include(":runners:prism:java")
include(":runners:spark:3")
include(":runners:spark:3:job-server")
include(":runners:spark:3:job-server:container")
Expand Down

0 comments on commit 631d40d

Please sign in to comment.