From 2113e954b663ac09a8fe8cfcce8556d69582345c Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Wed, 17 Apr 2024 22:23:04 +0000 Subject: [PATCH 1/3] Connect Web UI requests with backend --- .../beam/runners/prism/internal/execute.go | 2 +- .../prism/internal/jobservices/management.go | 6 +- .../internal/jobservices/management_test.go | 42 +++--- .../prism/internal/jobservices/server_test.go | 11 +- .../prism/internal/web/assets/job-action.js | 120 +++++++++++++++++- .../prism/internal/web/jobdetails.html | 2 +- .../beam/runners/prism/internal/web/web.go | 21 ++- 7 files changed, 166 insertions(+), 38 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index 504125a2bd6e..b218d84b891c 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -69,7 +69,7 @@ func RunPipeline(j *jobservices.Job) { j.SendMsg("running " + j.String()) j.Running() - if err := executePipeline(j.RootCtx, wks, j); err != nil { + if err := executePipeline(j.RootCtx, wks, j); err != nil && !errors.Is(err, jobservices.ErrCancel) { j.Failed(err) return } diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index 4cff2ae92e7c..d264af9dfc6a 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -243,21 +243,21 @@ func (s *Server) Run(ctx context.Context, req *jobpb.RunJobRequest) (*jobpb.RunJ // Otherwise, returns nil if Job does not exist or the Job's existing state as part of the CancelJobResponse. func (s *Server) Cancel(_ context.Context, req *jobpb.CancelJobRequest) (*jobpb.CancelJobResponse, error) { s.mu.Lock() + defer s.mu.Unlock() job, ok := s.jobs[req.GetJobId()] - s.mu.Unlock() if !ok { return nil, nil } state := job.state.Load().(jobpb.JobState_Enum) switch state { - case jobpb.JobState_CANCELLED, jobpb.JobState_DONE, jobpb.JobState_DRAINED, jobpb.JobState_UPDATED, jobpb.JobState_FAILED: + case jobpb.JobState_CANCELLED, jobpb.JobState_DONE, jobpb.JobState_DRAINED, jobpb.JobState_UPDATED, jobpb.JobState_FAILED, jobpb.JobState_STOPPED: // Already at terminal state. return &jobpb.CancelJobResponse{ State: state, }, nil } job.SendMsg("canceling " + job.String()) - job.Canceling() + job.Canceled() job.CancelFn(ErrCancel) return &jobpb.CancelJobResponse{ State: jobpb.JobState_CANCELLING, diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management_test.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management_test.go index 176abb8543a3..872309f06790 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management_test.go @@ -170,36 +170,34 @@ func TestServer(t *testing.T) { }, }, { - name: "Canceling", - noJobsCheck: func(ctx context.Context, t *testing.T, undertest *Server) { - resp, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{JobId: "job-001"}) - if resp != nil { - t.Errorf("Canceling(\"job-001\") = %s, want nil", resp) - } - if err != nil { - t.Errorf("Canceling(\"job-001\") = %v, want nil", err) - } - }, + name: "Canceling", + noJobsCheck: func(ctx context.Context, t *testing.T, undertest *Server) {}, postPrepCheck: func(ctx context.Context, t *testing.T, undertest *Server) { - resp, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{JobId: "job-001"}) + id := "job-001" + job, ok := undertest.jobs[id] + if !ok { + t.Fatalf("job not found in undertest.jobs: %s", id) + } + job.state.Store(jobpb.JobState_RUNNING) + resp, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{JobId: id}) if err != nil { - t.Errorf("Canceling(\"job-001\") = %v, want nil", err) + t.Errorf("Canceling(\"%s\") = %v, want nil", id, err) } if diff := cmp.Diff(&jobpb.CancelJobResponse{ State: jobpb.JobState_CANCELLING, }, resp, cmpOpts...); diff != "" { - t.Errorf("Canceling(\"job-001\") (-want, +got):\n%v", diff) + t.Errorf("Canceling(\"%s\") (-want, +got):\n%v", id, diff) } }, postRunCheck: func(ctx context.Context, t *testing.T, undertest *Server, jobID string) { - resp, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{JobId: jobID}) + resp, err := undertest.GetState(ctx, &jobpb.GetJobStateRequest{JobId: jobID}) if err != nil { - t.Errorf("Canceling(\"%s\") = %v, want nil", jobID, err) + t.Errorf("GetState(\"%s\") = %v, want nil", jobID, err) } - if diff := cmp.Diff(&jobpb.CancelJobResponse{ - State: jobpb.JobState_DONE, - }, resp, cmpOpts...); diff != "" { - t.Errorf("Canceling(\"%s\") (-want, +got):\n%v", jobID, diff) + want := jobpb.JobState_CANCELLED + got := resp.State + if got != want { + t.Errorf("Canceling(\"%s\") = %s, want %s", jobID, got, want) } }, }, @@ -230,7 +228,11 @@ func TestServer(t *testing.T) { shortIDSize: sizeData, }, }) - j.state.Store(jobpb.JobState_DONE) + state := jobpb.JobState_DONE + if j.state.Load() == jobpb.JobState_CANCELLED { + state = jobpb.JobState_CANCELLED + } + j.state.Store(state) called.Done() }) diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go index 473c84f958e3..eb921ac4a3a1 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go @@ -17,7 +17,6 @@ package jobservices import ( "context" - "errors" "sync" "testing" @@ -84,10 +83,8 @@ func TestServer_RunThenCancel(t *testing.T) { var called sync.WaitGroup called.Add(1) undertest := NewServer(0, func(j *Job) { - if errors.Is(context.Cause(j.RootCtx), ErrCancel) { - j.state.Store(jobpb.JobState_CANCELLED) - called.Done() - } + j.state.Store(jobpb.JobState_RUNNING) + called.Done() }) ctx := context.Background() @@ -118,6 +115,8 @@ func TestServer_RunThenCancel(t *testing.T) { t.Fatalf("server.Run() = returned empty preparation ID, want non-empty") } + called.Wait() + cancelResp, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{ JobId: runResp.GetJobId(), }) @@ -128,8 +127,6 @@ func TestServer_RunThenCancel(t *testing.T) { t.Fatalf("server.Canceling() = %v, want %v", cancelResp.State, jobpb.JobState_CANCELLING) } - called.Wait() - stateResp, err := undertest.GetState(ctx, &jobpb.GetJobStateRequest{JobId: runResp.GetJobId()}) if err != nil { t.Fatalf("server.GetState() = %v, want nil", err) diff --git a/sdks/go/pkg/beam/runners/prism/internal/web/assets/job-action.js b/sdks/go/pkg/beam/runners/prism/internal/web/assets/job-action.js index 999fd22bbf88..4f2e51a7c022 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/web/assets/job-action.js +++ b/sdks/go/pkg/beam/runners/prism/internal/web/assets/job-action.js @@ -12,6 +12,12 @@ limitations under the License. */ +/** + * job-action.js provides UI functionality for taking actions on Prism Jobs via: + * - jobManager: Client for Job Management. + * - uiStateProvider: Encapsulates UI state of user interactive elements on the page. + */ + /** Element class for job action container. */ const JOB_ACTION = '.job-action' @@ -21,6 +27,24 @@ const CANCEL = '.cancel' /** Element class assigned to RUNNING Job state. */ const RUNNING = 'RUNNING' +/** Element class for elements reporting Job state details. */ +const JOB_STATE = '.job-state' + +/** PATH holds consts that map to backend endpoints. */ +const PATH = { + + /** ROOT_ is the Job management path prefix for mapped backend endpoints. */ + ROOT_: '/job', + + /** CANCEL maps to the backend endpoint to cancel a Job. Terminates with '/' to prevent ServeMux 301 redirect. */ + get CANCEL() { + return `${this.ROOT_}/cancel/` + } +} + +/** HTTP related consts. */ +const HTTP_POST = 'POST' + /** * Client for Job Management. * @@ -34,9 +58,28 @@ const jobManager = { * @param jobId * TODO(https://github.com/apache/beam/issues/29669) Send request to backend service. */ - cancel: function(jobId) { + cancel: function (jobId) { console.debug(`cancel button for Job: ${jobId} clicked`) - } + const path = PATH.CANCEL + const request = { + method: HTTP_POST, + body: JSON.stringify(new CancelJobRequest(jobId)) + } + fetch(path, request) + .then(response => { + const requestJson = JSON.stringify(request) + const responseJson = JSON.stringify(response) + if (response.ok) { + console.debug(`Job cancellation request to ${path} of ${requestJson} for Job: ${jobId} sent successfully, response: ${responseJson}`) + uiStateProvider.onJobCancel(response) + } else { + console.error(`Failed to send job cancellation request to ${path} of ${requestJson} for Job: ${jobId}, response: ${responseJson}`) + } + }) + .catch(error => { + console.error(`Error occurred while sending job cancellation request for Job: ${jobId}`, error) + }) + }, } /** @@ -84,14 +127,85 @@ const uiStateProvider = { */ get isStateRunning() { return this.jobAction.classList.contains(RUNNING) + }, + + /** + * Queries the element containing the {@link JOB_STATE} class. + * @return {Element} + */ + get jobStateElement() { + let element = document.querySelector(JOB_STATE) + if (element === null) { + console.error(`no element found at ${JOB_STATE}`) + } + return element + }, + + /** + * Callback for successful Job Cancel requests. + * @param response {Response} + */ + onJobCancel(response) { + response.json().then(json => { + console.debug(`job cancel response json: ${JSON.stringify(json)}`) + uiStateProvider.jobStateElement.textContent = JobState_Enum[json.state] + }) + .catch(error => { + console.error(`error Response.json() ${error}`) + }) } } /** * Attaches an event listener to the window for 'load' events. */ -window.addEventListener("load", function(){ +window.addEventListener("load", function () { console.debug(JOB_ACTION, uiStateProvider.jobAction) console.debug(CANCEL, uiStateProvider.cancelButton) uiStateProvider.init() }) + +/** + * CancelJobRequest models a request to cancel a Job. + * + * Models after its proto namesake in: + * https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_job_api.proto + */ +class CancelJobRequest { + job_id_; + + constructor(jobId) { + this.job_id_ = jobId + } + + /** + * The ID of the Job to cancel. + * @return {string} + */ + get job_id() { + return this.job_id_ + } + + /** toJSON overrides JSON.stringify serialization behavior. */ + toJSON() { + return {job_id: this.job_id} + } +} + +/** Maps JobState_Enum from Job Management server response to the Job State name. See proto for more details: + * https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_job_api.proto + */ +const JobState_Enum = { + 0: "UNSPECIFIED", + 1: "STOPPED", + 2: "RUNNING", + 3: "DONE", + 4: "FAILED", + 5: "CANCELLED", + 6: "UPDATED", + 7: "DRAINING", + 8: "DRAINED", + 9: "STARTING", + 10: "CANCELLING", + 11: "UPDATING", +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/web/jobdetails.html b/sdks/go/pkg/beam/runners/prism/internal/web/jobdetails.html index ff87f677e2da..955132a03f45 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/web/jobdetails.html +++ b/sdks/go/pkg/beam/runners/prism/internal/web/jobdetails.html @@ -33,7 +33,7 @@ onclick="if (jobManager !== null) { jobManager.cancel('{{.JobID}}') }" >Cancel -
{{.State}}
+
{{.State}}
{{ if .Error}}
{{.Error}}
{{end}} diff --git a/sdks/go/pkg/beam/runners/prism/internal/web/web.go b/sdks/go/pkg/beam/runners/prism/internal/web/web.go index 33c282097538..baa14428aaac 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/web/web.go +++ b/sdks/go/pkg/beam/runners/prism/internal/web/web.go @@ -27,6 +27,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "html/template" + "io" "net/http" "sort" "strings" @@ -374,8 +375,22 @@ type jobCancelHandler struct { func (h *jobCancelHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { var cancelRequest *jobpb.CancelJobRequest - if err := json.NewDecoder(r.Body).Decode(&cancelRequest); err != nil { - err = fmt.Errorf("error parsing JSON of request: %w", err) + if r.Method != http.MethodPost { + http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) + return + } + body, err := io.ReadAll(r.Body) + if err != nil { + err = fmt.Errorf("could not read request body: %w", err) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + if len(body) == 0 { + http.Error(w, "empty request body", http.StatusBadRequest) + return + } + if err := json.Unmarshal(body, &cancelRequest); err != nil { + err = fmt.Errorf("error parsing JSON: %s of request: %w", body, err) http.Error(w, err.Error(), http.StatusBadRequest) return } @@ -405,10 +420,10 @@ func Initialize(ctx context.Context, port int, jobcli jobpb.JobServiceClient) er mux := http.NewServeMux() mux.Handle("/assets/", assetsFs) + mux.Handle("/job/cancel/", &jobCancelHandler{Jobcli: jobcli}) mux.Handle("/job/", &jobDetailsHandler{Jobcli: jobcli}) mux.Handle("/debugz", &debugzHandler{}) mux.Handle("/", &jobsConsoleHandler{Jobcli: jobcli}) - mux.Handle("/job/cancel", &jobCancelHandler{Jobcli: jobcli}) endpoint := fmt.Sprintf("localhost:%d", port) From d8470d679950f998fe1730a5534bd678f48c30ce Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Fri, 19 Apr 2024 21:29:40 +0000 Subject: [PATCH 2/3] Remove artificial setting of cancelled state --- .../prism/internal/jobservices/management.go | 4 +- .../internal/jobservices/management_test.go | 46 +++++++++++-------- .../prism/internal/jobservices/server_test.go | 13 ++++-- 3 files changed, 39 insertions(+), 24 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index d264af9dfc6a..6db94d9500c8 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -250,14 +250,14 @@ func (s *Server) Cancel(_ context.Context, req *jobpb.CancelJobRequest) (*jobpb. } state := job.state.Load().(jobpb.JobState_Enum) switch state { - case jobpb.JobState_CANCELLED, jobpb.JobState_DONE, jobpb.JobState_DRAINED, jobpb.JobState_UPDATED, jobpb.JobState_FAILED, jobpb.JobState_STOPPED: + case jobpb.JobState_CANCELLED, jobpb.JobState_DONE, jobpb.JobState_DRAINED, jobpb.JobState_UPDATED, jobpb.JobState_FAILED: // Already at terminal state. return &jobpb.CancelJobResponse{ State: state, }, nil } job.SendMsg("canceling " + job.String()) - job.Canceled() + job.Canceling() job.CancelFn(ErrCancel) return &jobpb.CancelJobResponse{ State: jobpb.JobState_CANCELLING, diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management_test.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management_test.go index 872309f06790..5aad58b4a86f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management_test.go @@ -46,9 +46,13 @@ func TestServer(t *testing.T) { cmpOpts := []cmp.Option{protocmp.Transform(), cmpopts.EquateEmpty()} tests := []struct { - name string + name string + postRunState jobpb.JobState_Enum + // noJobsCheck tests in the setting that the Job doesn't exist + // postPrepCheck tests after Server Prepare invoked noJobsCheck, postPrepCheck func(context.Context, *testing.T, *Server) - postRunCheck func(context.Context, *testing.T, *Server, string) + // postRunCheck tests after Server Run invoked + postRunCheck func(context.Context, *testing.T, *Server, string) }{ { name: "GetJobs", @@ -170,34 +174,38 @@ func TestServer(t *testing.T) { }, }, { - name: "Canceling", - noJobsCheck: func(ctx context.Context, t *testing.T, undertest *Server) {}, - postPrepCheck: func(ctx context.Context, t *testing.T, undertest *Server) { + name: "Canceling", + postRunState: jobpb.JobState_RUNNING, + noJobsCheck: func(ctx context.Context, t *testing.T, undertest *Server) { id := "job-001" - job, ok := undertest.jobs[id] - if !ok { - t.Fatalf("job not found in undertest.jobs: %s", id) + _, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{JobId: id}) + // Cancel currently returns nil, nil when Job not found + if err != nil { + t.Errorf("Cancel(%q) = %v, want not found error", id, err) } - job.state.Store(jobpb.JobState_RUNNING) + }, + postPrepCheck: func(ctx context.Context, t *testing.T, undertest *Server) { + id := "job-001" resp, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{JobId: id}) if err != nil { - t.Errorf("Canceling(\"%s\") = %v, want nil", id, err) + t.Errorf("Cancel(%q) = %v, want not found error", id, err) } if diff := cmp.Diff(&jobpb.CancelJobResponse{ State: jobpb.JobState_CANCELLING, }, resp, cmpOpts...); diff != "" { - t.Errorf("Canceling(\"%s\") (-want, +got):\n%v", id, diff) + t.Errorf("Cancel(%q) (-want, +got):\n%s", id, diff) } }, postRunCheck: func(ctx context.Context, t *testing.T, undertest *Server, jobID string) { - resp, err := undertest.GetState(ctx, &jobpb.GetJobStateRequest{JobId: jobID}) + id := "job-001" + resp, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{JobId: id}) if err != nil { - t.Errorf("GetState(\"%s\") = %v, want nil", jobID, err) + t.Errorf("Cancel(%q) = %v, want not found error", id, err) } - want := jobpb.JobState_CANCELLED - got := resp.State - if got != want { - t.Errorf("Canceling(\"%s\") = %s, want %s", jobID, got, want) + if diff := cmp.Diff(&jobpb.CancelJobResponse{ + State: jobpb.JobState_CANCELLING, + }, resp, cmpOpts...); diff != "" { + t.Errorf("Cancel(%q) (-want, +got):\n%s", id, diff) } }, }, @@ -229,8 +237,8 @@ func TestServer(t *testing.T) { }, }) state := jobpb.JobState_DONE - if j.state.Load() == jobpb.JobState_CANCELLED { - state = jobpb.JobState_CANCELLED + if test.postRunState != jobpb.JobState_UNSPECIFIED { + state = test.postRunState } j.state.Store(state) called.Done() diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go index eb921ac4a3a1..fb72048d478c 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server_test.go @@ -17,6 +17,7 @@ package jobservices import ( "context" + "errors" "sync" "testing" @@ -83,8 +84,13 @@ func TestServer_RunThenCancel(t *testing.T) { var called sync.WaitGroup called.Add(1) undertest := NewServer(0, func(j *Job) { + defer called.Done() j.state.Store(jobpb.JobState_RUNNING) - called.Done() + if errors.Is(context.Cause(j.RootCtx), ErrCancel) { + j.SendMsg("pipeline canceled " + j.String()) + j.Canceled() + return + } }) ctx := context.Background() @@ -115,11 +121,10 @@ func TestServer_RunThenCancel(t *testing.T) { t.Fatalf("server.Run() = returned empty preparation ID, want non-empty") } - called.Wait() - cancelResp, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{ JobId: runResp.GetJobId(), }) + if err != nil { t.Fatalf("server.Canceling() = %v, want nil", err) } @@ -127,6 +132,8 @@ func TestServer_RunThenCancel(t *testing.T) { t.Fatalf("server.Canceling() = %v, want %v", cancelResp.State, jobpb.JobState_CANCELLING) } + called.Wait() + stateResp, err := undertest.GetState(ctx, &jobpb.GetJobStateRequest{JobId: runResp.GetJobId()}) if err != nil { t.Fatalf("server.GetState() = %v, want nil", err) From 8d209a9c73b98a23adb21d899dc060360d0a5318 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Thu, 2 May 2024 16:43:22 +0000 Subject: [PATCH 3/3] Only lock when acquiring job --- .../pkg/beam/runners/prism/internal/jobservices/management.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index 6db94d9500c8..4cff2ae92e7c 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -243,8 +243,8 @@ func (s *Server) Run(ctx context.Context, req *jobpb.RunJobRequest) (*jobpb.RunJ // Otherwise, returns nil if Job does not exist or the Job's existing state as part of the CancelJobResponse. func (s *Server) Cancel(_ context.Context, req *jobpb.CancelJobRequest) (*jobpb.CancelJobResponse, error) { s.mu.Lock() - defer s.mu.Unlock() job, ok := s.jobs[req.GetJobId()] + s.mu.Unlock() if !ok { return nil, nil }