Skip to content

Commit

Permalink
fix(databricks): Check the response body before unmarshal (#5226)
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw authored Apr 12, 2024
1 parent c7d1463 commit 6159c27
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,14 @@ func (p Plugin) sendRequest(method string, databricksJob map[string]interface{},
return nil, err
}
var data map[string]interface{}
err = json.Unmarshal(responseBody, &data)
if err != nil {
return nil, fmt.Errorf("failed to parse response with err: [%v]", err)

if len(responseBody) != 0 {
err = json.Unmarshal(responseBody, &data)
if err != nil {
return nil, fmt.Errorf("failed to parse response with err: [%v]", err)
}
}

if resp.StatusCode != http.StatusOK {
message := ""
if v, ok := data["message"]; ok {
Expand All @@ -259,10 +263,16 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase
taskInfo := createTaskInfo(exec.RunID, jobID, exec.DatabricksInstance)
switch lifeCycleState {
// Job response format. https://docs.databricks.com/en/workflows/jobs/jobs-2.0-api.html#runlifecyclestate
case "QUEUED":
return core.PhaseInfoQueued(time.Now(), core.DefaultPhaseVersion, message), nil
case "PENDING":
return core.PhaseInfoInitializing(time.Now(), core.DefaultPhaseVersion, message, taskInfo), nil
case "RUNNING":
fallthrough
case "BLOCKED":
fallthrough
case "WAITING_FOR_RETRY":
fallthrough
case "TERMINATING":
return core.PhaseInfoRunning(core.DefaultPhaseVersion, taskInfo), nil
case "TERMINATED":
Expand Down

0 comments on commit 6159c27

Please sign in to comment.