Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Prism] Connect Web UI cancel requests with backend #31028

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func RunPipeline(j *jobservices.Job) {
j.SendMsg("running " + j.String())
j.Running()

if err := executePipeline(j.RootCtx, wks, j); err != nil {
if err := executePipeline(j.RootCtx, wks, j); err != nil && !errors.Is(err, jobservices.ErrCancel) {
j.Failed(err)
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ func TestServer(t *testing.T) {

cmpOpts := []cmp.Option{protocmp.Transform(), cmpopts.EquateEmpty()}
tests := []struct {
name string
name string
postRunState jobpb.JobState_Enum
// noJobsCheck tests in the setting that the Job doesn't exist
// postPrepCheck tests after Server Prepare invoked
noJobsCheck, postPrepCheck func(context.Context, *testing.T, *Server)
postRunCheck func(context.Context, *testing.T, *Server, string)
// postRunCheck tests after Server Run invoked
postRunCheck func(context.Context, *testing.T, *Server, string)
}{
{
name: "GetJobs",
Expand Down Expand Up @@ -170,36 +174,38 @@ func TestServer(t *testing.T) {
},
},
{
name: "Canceling",
name: "Canceling",
postRunState: jobpb.JobState_RUNNING,
noJobsCheck: func(ctx context.Context, t *testing.T, undertest *Server) {
resp, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{JobId: "job-001"})
if resp != nil {
t.Errorf("Canceling(\"job-001\") = %s, want nil", resp)
}
id := "job-001"
_, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{JobId: id})
// Cancel currently returns nil, nil when Job not found
if err != nil {
t.Errorf("Canceling(\"job-001\") = %v, want nil", err)
t.Errorf("Cancel(%q) = %v, want not found error", id, err)
}
},
postPrepCheck: func(ctx context.Context, t *testing.T, undertest *Server) {
resp, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{JobId: "job-001"})
id := "job-001"
resp, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{JobId: id})
if err != nil {
t.Errorf("Canceling(\"job-001\") = %v, want nil", err)
t.Errorf("Cancel(%q) = %v, want not found error", id, err)
}
if diff := cmp.Diff(&jobpb.CancelJobResponse{
State: jobpb.JobState_CANCELLING,
}, resp, cmpOpts...); diff != "" {
t.Errorf("Canceling(\"job-001\") (-want, +got):\n%v", diff)
t.Errorf("Cancel(%q) (-want, +got):\n%s", id, diff)
}
},
postRunCheck: func(ctx context.Context, t *testing.T, undertest *Server, jobID string) {
resp, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{JobId: jobID})
id := "job-001"
resp, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{JobId: id})
if err != nil {
t.Errorf("Canceling(\"%s\") = %v, want nil", jobID, err)
t.Errorf("Cancel(%q) = %v, want not found error", id, err)
}
if diff := cmp.Diff(&jobpb.CancelJobResponse{
State: jobpb.JobState_DONE,
State: jobpb.JobState_CANCELLING,
}, resp, cmpOpts...); diff != "" {
t.Errorf("Canceling(\"%s\") (-want, +got):\n%v", jobID, diff)
t.Errorf("Cancel(%q) (-want, +got):\n%s", id, diff)
}
},
},
Expand Down Expand Up @@ -230,7 +236,11 @@ func TestServer(t *testing.T) {
shortIDSize: sizeData,
},
})
j.state.Store(jobpb.JobState_DONE)
state := jobpb.JobState_DONE
if test.postRunState != jobpb.JobState_UNSPECIFIED {
state = test.postRunState
}
j.state.Store(state)
called.Done()
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,12 @@ func TestServer_RunThenCancel(t *testing.T) {
var called sync.WaitGroup
called.Add(1)
undertest := NewServer(0, func(j *Job) {
defer called.Done()
j.state.Store(jobpb.JobState_RUNNING)
if errors.Is(context.Cause(j.RootCtx), ErrCancel) {
j.state.Store(jobpb.JobState_CANCELLED)
called.Done()
j.SendMsg("pipeline canceled " + j.String())
j.Canceled()
return
}
})
ctx := context.Background()
Expand Down Expand Up @@ -121,6 +124,7 @@ func TestServer_RunThenCancel(t *testing.T) {
cancelResp, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{
JobId: runResp.GetJobId(),
})

if err != nil {
t.Fatalf("server.Canceling() = %v, want nil", err)
}
Expand Down
120 changes: 117 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/web/assets/job-action.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
limitations under the License.
*/

/**
* job-action.js provides UI functionality for taking actions on Prism Jobs via:
* - jobManager: Client for Job Management.
* - uiStateProvider: Encapsulates UI state of user interactive elements on the page.
*/

/** Element class for job action container. */
const JOB_ACTION = '.job-action'

Expand All @@ -21,6 +27,24 @@ const CANCEL = '.cancel'
/** Element class assigned to RUNNING Job state. */
const RUNNING = 'RUNNING'

/** Element class for elements reporting Job state details. */
const JOB_STATE = '.job-state'

/** PATH holds consts that map to backend endpoints. */
const PATH = {

/** ROOT_ is the Job management path prefix for mapped backend endpoints. */
ROOT_: '/job',

/** CANCEL maps to the backend endpoint to cancel a Job. Terminates with '/' to prevent ServeMux 301 redirect. */
get CANCEL() {
return `${this.ROOT_}/cancel/`
}
}

/** HTTP related consts. */
const HTTP_POST = 'POST'

/**
* Client for Job Management.
*
Expand All @@ -34,9 +58,28 @@ const jobManager = {
* @param jobId
* TODO(https://github.com/apache/beam/issues/29669) Send request to backend service.
*/
cancel: function(jobId) {
cancel: function (jobId) {
console.debug(`cancel button for Job: ${jobId} clicked`)
}
const path = PATH.CANCEL
const request = {
method: HTTP_POST,
body: JSON.stringify(new CancelJobRequest(jobId))
}
fetch(path, request)
.then(response => {
const requestJson = JSON.stringify(request)
const responseJson = JSON.stringify(response)
if (response.ok) {
console.debug(`Job cancellation request to ${path} of ${requestJson} for Job: ${jobId} sent successfully, response: ${responseJson}`)
uiStateProvider.onJobCancel(response)
} else {
console.error(`Failed to send job cancellation request to ${path} of ${requestJson} for Job: ${jobId}, response: ${responseJson}`)
}
})
.catch(error => {
console.error(`Error occurred while sending job cancellation request for Job: ${jobId}`, error)
})
},
}

/**
Expand Down Expand Up @@ -84,14 +127,85 @@ const uiStateProvider = {
*/
get isStateRunning() {
return this.jobAction.classList.contains(RUNNING)
},

/**
* Queries the element containing the {@link JOB_STATE} class.
* @return {Element}
*/
get jobStateElement() {
let element = document.querySelector(JOB_STATE)
if (element === null) {
console.error(`no element found at ${JOB_STATE}`)
}
return element
},

/**
* Callback for successful Job Cancel requests.
* @param response {Response}
*/
onJobCancel(response) {
response.json().then(json => {
console.debug(`job cancel response json: ${JSON.stringify(json)}`)
uiStateProvider.jobStateElement.textContent = JobState_Enum[json.state]
})
.catch(error => {
console.error(`error Response.json() ${error}`)
})
}
}

/**
* Attaches an event listener to the window for 'load' events.
*/
window.addEventListener("load", function(){
window.addEventListener("load", function () {
console.debug(JOB_ACTION, uiStateProvider.jobAction)
console.debug(CANCEL, uiStateProvider.cancelButton)
uiStateProvider.init()
})

/**
* CancelJobRequest models a request to cancel a Job.
*
* Models after its proto namesake in:
* https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_job_api.proto
*/
class CancelJobRequest {
job_id_;

constructor(jobId) {
this.job_id_ = jobId
}

/**
* The ID of the Job to cancel.
* @return {string}
*/
get job_id() {
return this.job_id_
}

/** toJSON overrides JSON.stringify serialization behavior. */
toJSON() {
return {job_id: this.job_id}
}
}

/** Maps JobState_Enum from Job Management server response to the Job State name. See proto for more details:
* https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_job_api.proto
*/
const JobState_Enum = {
0: "UNSPECIFIED",
1: "STOPPED",
2: "RUNNING",
3: "DONE",
4: "FAILED",
5: "CANCELLED",
6: "UPDATED",
7: "DRAINING",
8: "DRAINED",
9: "STARTING",
10: "CANCELLING",
11: "UPDATING",
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
onclick="if (jobManager !== null) { jobManager.cancel('{{.JobID}}') }"
>Cancel</button>
</div>
<div>{{.State}}</div>
<div class="job-state">{{.State}}</div>
</header>
<section class="container">
{{ if .Error}}<div class="child">{{.Error}}</div>{{end}}
Expand Down
21 changes: 18 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"html/template"
"io"
"net/http"
"sort"
"strings"
Expand Down Expand Up @@ -374,8 +375,22 @@ type jobCancelHandler struct {

func (h *jobCancelHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var cancelRequest *jobpb.CancelJobRequest
if err := json.NewDecoder(r.Body).Decode(&cancelRequest); err != nil {
err = fmt.Errorf("error parsing JSON of request: %w", err)
if r.Method != http.MethodPost {
http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
return
}
body, err := io.ReadAll(r.Body)
if err != nil {
err = fmt.Errorf("could not read request body: %w", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if len(body) == 0 {
http.Error(w, "empty request body", http.StatusBadRequest)
return
}
if err := json.Unmarshal(body, &cancelRequest); err != nil {
err = fmt.Errorf("error parsing JSON: %s of request: %w", body, err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
Expand Down Expand Up @@ -405,10 +420,10 @@ func Initialize(ctx context.Context, port int, jobcli jobpb.JobServiceClient) er
mux := http.NewServeMux()

mux.Handle("/assets/", assetsFs)
mux.Handle("/job/cancel/", &jobCancelHandler{Jobcli: jobcli})
mux.Handle("/job/", &jobDetailsHandler{Jobcli: jobcli})
mux.Handle("/debugz", &debugzHandler{})
mux.Handle("/", &jobsConsoleHandler{Jobcli: jobcli})
mux.Handle("/job/cancel", &jobCancelHandler{Jobcli: jobcli})

endpoint := fmt.Sprintf("localhost:%d", port)

Expand Down
Loading