diff --git a/apiserver/resources/bob/api.yaml b/apiserver/resources/bob/api.yaml index aa98484d..80abf1fe 100644 --- a/apiserver/resources/bob/api.yaml +++ b/apiserver/resources/bob/api.yaml @@ -334,6 +334,16 @@ paths: success: value: message: "Ok" + "422": + description: Pipeline cannot be paused + content: + application/json: + schema: + $ref: "#/components/schemas/SimpleResponse" + examples: + success: + value: + message: "Pipeline cannot be paused now. Try again when running or stop it." "/pipelines/unpause/groups/{group}/names/{name}/runs/{id}": post: @@ -1184,6 +1194,7 @@ components: StatusResponse: type: string enum: + - initializing - running - passed - failed diff --git a/apiserver/src/apiserver/handlers.clj b/apiserver/src/apiserver/handlers.clj index 630d9d97..dc057a7d 100644 --- a/apiserver/src/apiserver/handlers.clj +++ b/apiserver/src/apiserver/handlers.clj @@ -119,17 +119,25 @@ "" (s/rename-keys pipeline-info {:id :run_id})))) +(defn- pausable? + [db run-id] + (let [{:keys [status]} (crux/entity (crux/db db) (keyword "bob.pipeline.run" run-id))] + (= :running status))) + (defn pipeline-pause-unpause [pause? {{pipeline-info :path} :parameters + db :db queue :queue}] - (exec #(publish queue - (if pause? - "pipeline/pause" - "pipeline/unpause") - "bob.fanout" - "" - (s/rename-keys pipeline-info {:id :run_id})))) + (if (not (pausable? db (:id pipeline-info))) + (respond "Pipeline cannot be paused/is already paused now. Try again when running or stop it." 422) + (exec #(publish queue + (if pause? + "pipeline/pause" + "pipeline/unpause") + "bob.fanout" + "" + (s/rename-keys pipeline-info {:id :run_id}))))) (defn pipeline-logs [{{{:keys [id offset lines]} :path} :parameters @@ -152,14 +160,7 @@ (defn pipeline-status [{{{:keys [id]} :path} :parameters db :db}] - (f/try-all [result (crux/q (crux/db db) - {:find ['(pull run [:status])] - :where [['run :type :pipeline-run] - ['run :crux.db/id (keyword "bob.pipeline.run" id)]]}) - status (->> result - (map first) - (map :status) - first)] + (f/try-all [{:keys [status]} (crux/entity (crux/db db) (keyword "bob.pipeline.run" id))] (if (some? status) (respond status 200) (respond "Cannot find status" 404)) diff --git a/apiserver/test/apiserver/handlers_test.clj b/apiserver/test/apiserver/handlers_test.clj index ecda80ee..7ed844cc 100644 --- a/apiserver/test/apiserver/handlers_test.clj +++ b/apiserver/test/apiserver/handlers_test.clj @@ -124,13 +124,21 @@ (t/is (= "pipeline/start" type))))))) (t/deftest pipeline-fanout-tests - (u/with-system (fn [_ queue] + (u/with-system (fn [db queue] (lq/declare queue "bob.tests" {:exclusive true :auto-delete true :durable false}) (lq/bind queue "bob.tests" "bob.fanout") + (crux/await-tx db + (crux/submit-tx db + [[:crux.tx/put + {:crux.db/id :bob.pipeline.run/a-run-id + :type :pipeline-run + :group "dev" + :name "test" + :status :running}]])) (t/testing "pipeline stop" (h/pipeline-stop {:parameters {:path {:group "dev" :name "test" @@ -147,6 +155,7 @@ {:parameters {:path {:group "dev" :name "test" :id "a-run-id"}} + :db db :queue queue}) (let [{:keys [type data]} (queue-get queue "bob.tests")] (t/is (= {:group "dev" @@ -159,6 +168,7 @@ {:parameters {:path {:group "dev" :name "test" :id "a-run-id"}} + :db db :queue queue}) (let [{:keys [type data]} (queue-get queue "bob.tests")] (t/is (= {:group "dev" @@ -457,3 +467,26 @@ "{:find [(pull f [:type])] :where [[f :type :indian]]})" :t point-in-time}}}))))))))) + +(t/deftest unprocessable-requests + (u/with-system + (fn [db queue] + (t/testing "pipeline cannot be paused when initializing" + (crux/await-tx + db + (crux/submit-tx db + [[:crux.tx/put + {:crux.db/id :bob.pipeline.run/pause-me-if-you-can + :type :pipeline-run + :group "dev" + :name "test" + :status :initializing}]])) + (let [{:keys [body status]} (h/pipeline-pause-unpause true + {:parameters {:path {:group "dev" + :name "test" + :id "pause-me-if-you-can"}} + :db db + :queue queue})] + (t/is (= 422 status)) + (t/is (= "Pipeline cannot be paused/is already paused now. Try again when running or stop it." + (:message body)))))))) diff --git a/runner/src/runner/pipeline.clj b/runner/src/runner/pipeline.clj index d5dc83b0..465ba3e9 100644 --- a/runner/src/runner/pipeline.clj +++ b/runner/src/runner/pipeline.clj @@ -197,12 +197,12 @@ name))) {:keys [image steps vars]} pipeline _ (log/infof "Starting new run: %s" run-id) - txn (crux/submit-tx db-client - [[:crux.tx/put - (assoc run-info - :status :running - :started (Instant/now))]]) - _ (crux/await-tx db-client txn) + _ (crux/await-tx db-client + (crux/submit-tx db-client + [[:crux.tx/put + (assoc run-info + :status :initializing + :started (Instant/now))]])) _ (log-event db-client run-id (str "Pulling image " image)) _ (docker/pull-image image) _ (mark-image-for-gc image run-id) @@ -213,15 +213,21 @@ :env vars :group group :name name} + _ (crux/await-tx db-client + (crux/submit-tx db-client + [[:crux.tx/put + (assoc (run-info-of db-client run-id) + :status + :running)]])) _ (reduce exec-step build-state steps) ;; This is WHOLE of Bob! _ (gc-images run-id) _ (clean-up-run run-id) - txn (crux/submit-tx db-client - [[:crux.tx/put - (assoc (run-info-of db-client run-id) - :status :passed - :completed (Instant/now))]]) - _ (crux/await-tx db-client txn) + _ (crux/await-tx db-client + (crux/submit-tx db-client + [[:crux.tx/put + (assoc (run-info-of db-client run-id) + :status :passed + :completed (Instant/now))]])) _ (log/infof "Run successful %s" run-id) _ (log-event db-client run-id "Run successful")] run-id diff --git a/runner/test/runner/pipeline_test.clj b/runner/test/runner/pipeline_test.clj index 16532525..1f58f706 100644 --- a/runner/test/runner/pipeline_test.clj +++ b/runner/test/runner/pipeline_test.clj @@ -285,13 +285,12 @@ run-info (crux/entity (crux/db db) (keyword (str "bob.pipeline.run/" result))) statuses (->> history (map :crux.db/doc) - (map :status) - (into #{}))] + (map :status))] + (is (= [:passed :running :initializing] + statuses)) (is (not (f/failed? result))) (is (inst? (:started run-info))) - (is (inst? (:completed run-info))) - (is (contains? statuses :running)) - (is (contains? statuses :passed)))))) + (is (inst? (:completed run-info))))))) (testing "failed pipeline run" (u/with-system (fn [db queue]