Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#28187] Add standalone prism validates runner precommit #28487

Merged
merged 3 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions .github/workflows/beam_PreCommit_GoPrism.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: PreCommit GoPrism
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you validated that this action passes on a fork of the repo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically no. But I have no reason to believe it won't work.

This file is copied from GoPortable, with GoPortable switched with GoPrism, and the new gradle command. I've been running the new gradle command the whole time to get this working.

I also somehow ended up with a beam repo branch rather than a fork branch.... I don't know how that happened. Something clearly changed with my usual flow. That aside...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll merge this in, and run the dispatch right away, and I'll be ready to disable the auto-runs if that manual kick fails.

Not ideal, but I'm confident.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, that failed.

Somehow it doesn't have a go.mod file in the beam/sdks/ directory, so it can't build a prism instance.

Copy link
Contributor

@jrmccluskey jrmccluskey Sep 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When. I was adding the govulncheck action it was funky about that, I had to specify the working directory to be the sdks directory and let it run there rather than specifying the path to the directory from Beam root. Might be a place to start

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I think the issue is that it's building from the beam directory rather than beam/sdks. Changing directories before building the prism binary should sort it out.

Not entirely sure why it works locally for me, since that should be the same as well... interesting.


on:
push:
tags: ['v*']
branches: ['master', 'release-*']
paths: ['model/**', 'sdks/go.**', 'release/**','.github/workflows/beam_PreCommit_GoPrism.yml']
pull_request_target:
branches: ['master', 'release-*']
paths: ['model/**', 'sdks/go.**', 'release/**']
issue_comment:
types: [created]
schedule:
- cron: '0 */6 * * *'
workflow_dispatch:

# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.event.pull_request.head.label || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.body || github.event.sender.login}}'
cancel-in-progress: true

env:
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}

# Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event
permissions:
actions: write
pull-requests: read
checks: read
contents: read
deployments: read
id-token: none
issues: read
discussions: read
packages: read
pages: read
repository-projects: read
security-events: read
statuses: read

jobs:
beam_PreCommit_GoPrism:
name: ${{matrix.job_name}} (${{ matrix.job_phrase }})
runs-on: [self-hosted, ubuntu-20.04, main]
strategy:
matrix:
job_name: [beam_PreCommit_GoPrism]
job_phrase: [Run GoPrism PreCommit]
timeout-minutes: 120
if: |
github.event_name == 'push' ||
github.event_name == 'pull_request_target' ||
github.event_name == 'schedule' ||
github.event_name == 'workflow_dispatch' ||
github.event.comment.body == 'Run GoPrism PreCommit'
steps:
- uses: actions/checkout@v3
- name: Setup repository
uses: ./.github/actions/setup-action
with:
comment_phrase: ${{ matrix.job_phrase }}
github_token: ${{ secrets.GITHUB_TOKEN }}
github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
- name: Setup self-hosted
uses: ./.github/actions/setup-self-hosted-action
with:
requires-py-39: false
requires-go: false
- name: Run goPrismPreCommit script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :goPrismPreCommit
4 changes: 4 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,10 @@ tasks.register("goPortablePreCommit") {
dependsOn(":sdks:go:test:ulrValidatesRunner")
}

tasks.register("goPrismPreCommit") {
dependsOn(":sdks:go:test:prismValidatesRunner")
}

tasks.register("goPostCommitDataflowARM") {
dependsOn(":sdks:go:test:dataflowValidatesRunnerARM64")
}
Expand Down
1 change: 1 addition & 0 deletions sdks/go/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ Executing all unit tests for the SDK is possible from the `<beam root>\sdks\go`
To test your change as Jenkins would execute it from a PR, from the
beam root directory, run:
* `./gradlew :sdks:go:goTest` executes the unit tests.
* `./gradlew :sdks:go:test:prismValidatesRunner` validates the SDK against the Go Prism runner as a stand alone binary, with containers.
* `./gradlew :sdks:go:test:ulrValidatesRunner` validates the SDK against the Portable Python runner.
* `./gradlew :sdks:go:test:flinkValidatesRunner` validates the SDK against the Flink runner.

Expand Down
10 changes: 6 additions & 4 deletions sdks/go/cmd/prism/prism.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,21 @@ import (
)

var (
jobPort = flag.Int("job_port", 8073, "specify the job management service port")
webPort = flag.Int("web_port", 8074, "specify the web ui port")
jobManagerEndpoint = flag.String("jm_override", "", "set to only stand up a web ui that refers to a seperate JobManagement endpoint")
serveHTTP = flag.Bool("serve_http", true, "enable or disable the web ui")
)

func main() {
flag.Parse()
ctx := context.Background()
cli, err := makeJobClient(ctx, *jobManagerEndpoint)
cli, err := makeJobClient(ctx, prism.Options{Port: *jobPort}, *jobManagerEndpoint)
if err != nil {
log.Fatalf("error creating job server: %v", err)
}
if *serveHTTP {
if err := prism.CreateWebServer(ctx, cli, prism.Options{Port: 8074}); err != nil {
if err := prism.CreateWebServer(ctx, cli, prism.Options{Port: *webPort}); err != nil {
log.Fatalf("error creating web server: %v", err)
}
} else {
Expand All @@ -51,15 +53,15 @@ func main() {
}
}

func makeJobClient(ctx context.Context, endpoint string) (jobpb.JobServiceClient, error) {
func makeJobClient(ctx context.Context, opts prism.Options, endpoint string) (jobpb.JobServiceClient, error) {
if endpoint != "" {
clientConn, err := grpc.DialContext(ctx, endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
if err != nil {
return nil, fmt.Errorf("error connecting to job server at %v: %v", endpoint, err)
}
return jobpb.NewJobServiceClient(clientConn), nil
}
cli, err := prism.CreateJobServer(ctx, prism.Options{Port: 8073})
cli, err := prism.CreateJobServer(ctx, opts)
if err != nil {
return nil, fmt.Errorf("error creating local job server: %v", err)
}
Expand Down
16 changes: 9 additions & 7 deletions sdks/go/pkg/beam/runners/prism/internal/environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,15 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock
envs = append(envs, credEnv)
}
}

if rc, err := cli.ImagePull(ctx, dp.GetContainerImage(), dtyp.ImagePullOptions{}); err == nil {
// Copy the output, but discard it so we can wait until the image pull is finished.
io.Copy(io.Discard, rc)
rc.Close()
} else {
logger.Warn("unable to pull image", "error", err)
if _, _, err := cli.ImageInspectWithRaw(ctx, dp.GetContainerImage()); err != nil {
// We don't have a local image, so we should pull it.
if rc, err := cli.ImagePull(ctx, dp.GetContainerImage(), dtyp.ImagePullOptions{}); err == nil {
// Copy the output, but discard it so we can wait until the image pull is finished.
io.Copy(io.Discard, rc)
rc.Close()
} else {
logger.Warn("unable to pull image and it's not local", "error", err)
}
}

ccr, err := cli.ContainerCreate(ctx, &container.Config{
Expand Down
4 changes: 4 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,14 @@ func RunPipeline(j *jobservices.Job) {
// makeWorker creates a worker for that environment.
func makeWorker(env string, j *jobservices.Job) (*worker.W, error) {
wk := worker.New(j.String()+"_"+env, env)

wk.EnvPb = j.Pipeline.GetComponents().GetEnvironments()[env]
wk.PipelineOptions = j.PipelineOptions()
wk.JobKey = j.JobKey()
wk.ArtifactEndpoint = j.ArtifactEndpoint()

go wk.Serve()

if err := runEnvironment(j.RootCtx, j, env, wk); err != nil {
return nil, fmt.Errorf("failed to start environment %v for job %v: %w", env, j, err)
}
Expand Down
4 changes: 4 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ func (j *Job) ArtifactEndpoint() string {
return j.artifactEndpoint
}

func (j *Job) PipelineOptions() *structpb.Struct {
return j.options
}

// ContributeTentativeMetrics returns the datachannel read index, and any unknown monitoring short ids.
func (j *Job) ContributeTentativeMetrics(payloads *fnpb.ProcessBundleProgressResponse) (int64, []string) {
return j.metrics.ContributeTentativeMetrics(payloads)
Expand Down
11 changes: 6 additions & 5 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/types/known/structpb"
)

// A W manages worker environments, sending them work
Expand All @@ -59,6 +60,7 @@ type W struct {

JobKey, ArtifactEndpoint string
EnvPb *pipepb.Environment
PipelineOptions *structpb.Struct

// Server management
lis net.Listener
Expand Down Expand Up @@ -163,7 +165,6 @@ func (wk *W) GetProvisionInfo(_ context.Context, _ *fnpb.GetProvisionInfoRequest
}
resp := &fnpb.GetProvisionInfoResponse{
Info: &fnpb.ProvisionInfo{
// TODO: Add the job's Pipeline options
// TODO: Include runner capabilities with the per job configuration.
RunnerCapabilities: []string{
urns.CapabilityMonitoringInfoShortIDs,
Expand All @@ -174,14 +175,14 @@ func (wk *W) GetProvisionInfo(_ context.Context, _ *fnpb.GetProvisionInfoRequest
Url: wk.ArtifactEndpoint,
},

RetrievalToken: wk.JobKey,
Dependencies: wk.EnvPb.GetDependencies(),

// TODO add this job's artifact Dependencies
RetrievalToken: wk.JobKey,
Dependencies: wk.EnvPb.GetDependencies(),
PipelineOptions: wk.PipelineOptions,

Metadata: map[string]string{
"runner": "prism",
"runner_version": core.SdkVersion,
"variant": "test",
},
},
}
Expand Down
6 changes: 3 additions & 3 deletions sdks/go/pkg/beam/runners/prism/prism.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error)
s := jobservices.NewServer(0, internal.RunPipeline)
*jobopts.Endpoint = s.Endpoint()
go s.Serve()
}
if !jobopts.IsLoopback() {
*jobopts.EnvironmentType = "loopback"
if !jobopts.IsLoopback() {
*jobopts.EnvironmentType = "loopback"
}
}
return universal.Execute(ctx, p)
}
Expand Down
24 changes: 24 additions & 0 deletions sdks/go/test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,30 @@ tasks.register("ulrValidatesRunner") {
}
}

// ValidatesRunner tests for Prism. Runs tests in the integration directory
// with prism in docker mod to validate that the runner behaves as expected.
task prismValidatesRunner {
group = "Verification"

dependsOn ":sdks:go:test:goBuild"
dependsOn ":sdks:go:container:docker"
dependsOn ":sdks:java:container:java8:docker"
dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar"
doLast {
def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags.
"--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
]
def options = [
"--runner prism",
"--pipeline_opts \"${pipelineOptions.join(' ')}\"",
]
exec {
executable "sh"
args "-c", "./run_validatesrunner_tests.sh ${options.join(' ')}"
}
}
}

// A method for configuring a cross-language validates runner test task,
// intended to be used in calls to createCrossLanguageValidatesRunnerTask.
ext.goIoValidatesRunnerTask = { proj, name, scriptOpts, pipelineOpts ->
Expand Down
4 changes: 3 additions & 1 deletion sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ package integration
import (
"fmt"
"math/rand"
"os"
"regexp"
"strings"
"testing"
"time"
"os"

// common runner flag.
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
Expand Down Expand Up @@ -140,6 +140,8 @@ var portableFilters = []string{
}

var prismFilters = []string{
// The prism runner does not yet support cross-language.
"TestXLang.*",
// The prism runner does not support the TestStream primitive
"TestTestStream.*",
// The trigger and pane tests uses TestStream
Expand Down
16 changes: 11 additions & 5 deletions sdks/go/test/run_validatesrunner_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,10 @@ print(s.getsockname()[1])
s.close()
"

TMPDIR=$(mktemp -d)

# Set up environment based on runner.
if [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" || "$RUNNER" == "samza" || "$RUNNER" == "portable" ]]; then
if [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" || "$RUNNER" == "samza" || "$RUNNER" == "portable" || "$RUNNER" == "prism" ]]; then
if [[ -z "$ENDPOINT" ]]; then
JOB_PORT=$(python3 -c "$SOCKET_SCRIPT")
ENDPOINT="localhost:$JOB_PORT"
Expand Down Expand Up @@ -288,6 +290,10 @@ if [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" || "$RUNNER" == "samza" || "$
python3 \
-m apache_beam.runners.portability.local_job_service_main \
--port $JOB_PORT &
elif [[ "$RUNNER" == "prism" ]]; then
PRISMBIN=$TMPDIR/prismbin
./sdks/go/run_with_go_version.sh build -o $PRISMBIN sdks/go/cmd/prism/*.go
$PRISMBIN --job_port $JOB_PORT &
else
echo "Unknown runner: $RUNNER"
exit 1;
Expand Down Expand Up @@ -340,7 +346,6 @@ if [[ "$RUNNER" == "dataflow" ]]; then
gcloud --version

# ensure gcloud is version 186 or above
TMPDIR=$(mktemp -d)
gcloud_ver=$(gcloud -v | head -1 | awk '{print $4}')
if [[ "$gcloud_ver" < "186" ]]
then
Expand Down Expand Up @@ -402,6 +407,7 @@ fi
ARGS="$ARGS -p $SIMULTANEOUS"

# Assemble test arguments and pipeline options.
ARGS="$ARGS -v"
ARGS="$ARGS -timeout $TIMEOUT"
ARGS="$ARGS --runner=$RUNNER"
ARGS="$ARGS --project=$DATAFLOW_PROJECT"
Expand Down Expand Up @@ -449,9 +455,9 @@ if [[ "$RUNNER" == "dataflow" ]]; then
docker rmi $JAVA_CONTAINER:$JAVA_TAG || echo "Failed to remove container"
gcloud --quiet container images delete $JAVA_CONTAINER:$JAVA_TAG || echo "Failed to delete container"
fi

# Clean up tempdir
rm -rf $TMPDIR
fi

# Clean up tempdir
rm -rf $TMPDIR

exit $TEST_EXIT_CODE