Skip to content

Commit

Permalink
[#102] Add initializing status, return 422 when un-pausable, simplify…
Browse files Browse the repository at this point in the history
… status fetch

Co-authored-by: Em Grasmeder <[email protected]>
  • Loading branch information
lispyclouds and emgrasmeder committed Apr 28, 2021
1 parent b52046d commit 51152c8
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 33 deletions.
11 changes: 11 additions & 0 deletions apiserver/resources/bob/api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,16 @@ paths:
success:
value:
message: "Ok"
"422":
description: Pipeline cannot be paused
content:
application/json:
schema:
$ref: "#/components/schemas/SimpleResponse"
examples:
success:
value:
message: "Pipeline cannot be paused now. Try again when running or stop it."

"/pipelines/unpause/groups/{group}/names/{name}/runs/{id}":
post:
Expand Down Expand Up @@ -1184,6 +1194,7 @@ components:
StatusResponse:
type: string
enum:
- initializing
- running
- passed
- failed
Expand Down
31 changes: 16 additions & 15 deletions apiserver/src/apiserver/handlers.clj
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,25 @@
""
(s/rename-keys pipeline-info {:id :run_id}))))

(defn- pausable?
[db run-id]
(let [{:keys [status]} (crux/entity (crux/db db) (keyword "bob.pipeline.run" run-id))]
(= :running status)))

(defn pipeline-pause-unpause
[pause?
{{pipeline-info :path} :parameters
db :db
queue :queue}]
(exec #(publish queue
(if pause?
"pipeline/pause"
"pipeline/unpause")
"bob.fanout"
""
(s/rename-keys pipeline-info {:id :run_id}))))
(if (not (pausable? db (:id pipeline-info)))
(respond "Pipeline cannot be paused/is already paused now. Try again when running or stop it." 422)
(exec #(publish queue
(if pause?
"pipeline/pause"
"pipeline/unpause")
"bob.fanout"
""
(s/rename-keys pipeline-info {:id :run_id})))))

(defn pipeline-logs
[{{{:keys [id offset lines]} :path} :parameters
Expand All @@ -152,14 +160,7 @@
(defn pipeline-status
[{{{:keys [id]} :path} :parameters
db :db}]
(f/try-all [result (crux/q (crux/db db)
{:find ['(pull run [:status])]
:where [['run :type :pipeline-run]
['run :crux.db/id (keyword "bob.pipeline.run" id)]]})
status (->> result
(map first)
(map :status)
first)]
(f/try-all [{:keys [status]} (crux/entity (crux/db db) (keyword "bob.pipeline.run" id))]
(if (some? status)
(respond status 200)
(respond "Cannot find status" 404))
Expand Down
35 changes: 34 additions & 1 deletion apiserver/test/apiserver/handlers_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,21 @@
(t/is (= "pipeline/start" type)))))))

(t/deftest pipeline-fanout-tests
(u/with-system (fn [_ queue]
(u/with-system (fn [db queue]
(lq/declare queue
"bob.tests"
{:exclusive true
:auto-delete true
:durable false})
(lq/bind queue "bob.tests" "bob.fanout")
(crux/await-tx db
(crux/submit-tx db
[[:crux.tx/put
{:crux.db/id :bob.pipeline.run/a-run-id
:type :pipeline-run
:group "dev"
:name "test"
:status :running}]]))
(t/testing "pipeline stop"
(h/pipeline-stop {:parameters {:path {:group "dev"
:name "test"
Expand All @@ -147,6 +155,7 @@
{:parameters {:path {:group "dev"
:name "test"
:id "a-run-id"}}
:db db
:queue queue})
(let [{:keys [type data]} (queue-get queue "bob.tests")]
(t/is (= {:group "dev"
Expand All @@ -159,6 +168,7 @@
{:parameters {:path {:group "dev"
:name "test"
:id "a-run-id"}}
:db db
:queue queue})
(let [{:keys [type data]} (queue-get queue "bob.tests")]
(t/is (= {:group "dev"
Expand Down Expand Up @@ -457,3 +467,26 @@
"{:find [(pull f [:type])]
:where [[f :type :indian]]})"
:t point-in-time}}})))))))))

(t/deftest unprocessable-requests
(u/with-system
(fn [db queue]
(t/testing "pipeline cannot be paused when initializing"
(crux/await-tx
db
(crux/submit-tx db
[[:crux.tx/put
{:crux.db/id :bob.pipeline.run/pause-me-if-you-can
:type :pipeline-run
:group "dev"
:name "test"
:status :initializing}]]))
(let [{:keys [body status]} (h/pipeline-pause-unpause true
{:parameters {:path {:group "dev"
:name "test"
:id "pause-me-if-you-can"}}
:db db
:queue queue})]
(t/is (= 422 status))
(t/is (= "Pipeline cannot be paused/is already paused now. Try again when running or stop it."
(:message body))))))))
30 changes: 18 additions & 12 deletions runner/src/runner/pipeline.clj
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,12 @@
name)))
{:keys [image steps vars]} pipeline
_ (log/infof "Starting new run: %s" run-id)
txn (crux/submit-tx db-client
[[:crux.tx/put
(assoc run-info
:status :running
:started (Instant/now))]])
_ (crux/await-tx db-client txn)
_ (crux/await-tx db-client
(crux/submit-tx db-client
[[:crux.tx/put
(assoc run-info
:status :initializing
:started (Instant/now))]]))
_ (log-event db-client run-id (str "Pulling image " image))
_ (docker/pull-image image)
_ (mark-image-for-gc image run-id)
Expand All @@ -213,15 +213,21 @@
:env vars
:group group
:name name}
_ (crux/await-tx db-client
(crux/submit-tx db-client
[[:crux.tx/put
(assoc (run-info-of db-client run-id)
:status
:running)]]))
_ (reduce exec-step build-state steps) ;; This is WHOLE of Bob!
_ (gc-images run-id)
_ (clean-up-run run-id)
txn (crux/submit-tx db-client
[[:crux.tx/put
(assoc (run-info-of db-client run-id)
:status :passed
:completed (Instant/now))]])
_ (crux/await-tx db-client txn)
_ (crux/await-tx db-client
(crux/submit-tx db-client
[[:crux.tx/put
(assoc (run-info-of db-client run-id)
:status :passed
:completed (Instant/now))]]))
_ (log/infof "Run successful %s" run-id)
_ (log-event db-client run-id "Run successful")]
run-id
Expand Down
9 changes: 4 additions & 5 deletions runner/test/runner/pipeline_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,12 @@
run-info (crux/entity (crux/db db) (keyword (str "bob.pipeline.run/" result)))
statuses (->> history
(map :crux.db/doc)
(map :status)
(into #{}))]
(map :status))]
(is (= [:passed :running :initializing]
statuses))
(is (not (f/failed? result)))
(is (inst? (:started run-info)))
(is (inst? (:completed run-info)))
(is (contains? statuses :running))
(is (contains? statuses :passed))))))
(is (inst? (:completed run-info)))))))

(testing "failed pipeline run"
(u/with-system (fn [db queue]
Expand Down

0 comments on commit 51152c8

Please sign in to comment.