From 78c644a0eb0c73978f7c2725bec7cd3a71cdc1eb Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 27 Nov 2023 10:17:11 -0500 Subject: [PATCH 1/2] Fix broken link (#646) --- ARCHITECTURE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 3452e5e5a..0366a64c8 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -66,7 +66,7 @@ async fn hello_activity(name: &str) -> String { We define the interface between the core and lang SDKs in terms of gRPC service definitions. The actual implementations of this "service" are not generated by gRPC generators, but the messages themselves are, and make it easier to hit the ground running in new languages. -See the latest API definition [here](https://github.com/temporalio/sdk-core/tree/master/protos/local/temporal/sdk/core) +See the latest API definition [here](https://github.com/temporalio/sdk-core/tree/master/sdk-core-protos/protos/local/temporal/sdk/core) ## Other topics From 0d746ba65aeff13088f970d1f51147fd17f9386c Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 28 Nov 2023 10:49:08 -0800 Subject: [PATCH 2/2] Order updates with signals (#639) --- .buildkite/docker/Dockerfile | 9 -- .buildkite/docker/build.sh | 5 - .buildkite/docker/docker-compose-ci.yaml | 27 ------ .buildkite/pipeline.yml | 57 ----------- .github/workflows/heavy.yml | 4 + .github/workflows/per-pr.yml | 96 +++++++++++++++++++ README.md | 6 +- core-api/src/lib.rs | 16 +--- core/src/core_tests/updates.rs | 6 +- core/src/worker/workflow/managed_run.rs | 2 +- core/src/worker/workflow/mod.rs | 31 +++++- .../docker-compose-telem.yaml | 0 .../docker => docker}/docker-compose.yaml | 0 ...andle_conversions_require_into_fail.stderr | 4 +- .../workflow_activation.proto | 27 ++++++ sdk-core-protos/src/lib.rs | 6 +- sdk/src/workflow_future.rs | 8 +- tests/integ_tests/update_tests.rs | 4 +- 18 files changed, 178 insertions(+), 130 deletions(-) delete mode 100644 .buildkite/docker/Dockerfile delete mode 100755 .buildkite/docker/build.sh delete mode 100644 .buildkite/docker/docker-compose-ci.yaml delete mode 100644 .buildkite/pipeline.yml create mode 100644 .github/workflows/per-pr.yml rename {.buildkite/docker => docker}/docker-compose-telem.yaml (100%) rename {.buildkite/docker => docker}/docker-compose.yaml (100%) diff --git a/.buildkite/docker/Dockerfile b/.buildkite/docker/Dockerfile deleted file mode 100644 index 1078ce2f0..000000000 --- a/.buildkite/docker/Dockerfile +++ /dev/null @@ -1,9 +0,0 @@ -FROM rust:1.71 - -RUN rustup component add rustfmt && \ - rustup component add clippy - -# RUN cargo install cargo-tarpaulin -RUN apt-get update && apt-get install -y protobuf-compiler - -WORKDIR /sdk-core diff --git a/.buildkite/docker/build.sh b/.buildkite/docker/build.sh deleted file mode 100755 index 602eed46d..000000000 --- a/.buildkite/docker/build.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/usr/bin/env sh -set -e -set -x - -cargo build diff --git a/.buildkite/docker/docker-compose-ci.yaml b/.buildkite/docker/docker-compose-ci.yaml deleted file mode 100644 index 054a1dbe4..000000000 --- a/.buildkite/docker/docker-compose-ci.yaml +++ /dev/null @@ -1,27 +0,0 @@ -version: '3.5' - -services: - unit-test: - build: - context: ../../ - dockerfile: .buildkite/docker/Dockerfile - security_opt: - - seccomp:unconfined - command: /bin/sh -c ".buildkite/docker/build.sh" - environment: - - "USER=unittest" - volumes: - - "../../:/sdk-core" - - integ-test: - build: - context: ../../ - dockerfile: .buildkite/docker/Dockerfile - command: /bin/sh -c ".buildkite/docker/build.sh" - environment: - - "USER=unittest" - - "TEMPORAL_SERVICE_ADDRESS=http://temporal:7233" - depends_on: - - temporal - volumes: - - "../../:/sdk-core" diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml deleted file mode 100644 index 3cea67e59..000000000 --- a/.buildkite/pipeline.yml +++ /dev/null @@ -1,57 +0,0 @@ -steps: - - label: "fmt & doc" - agents: - queue: "default" - docker: "*" - command: "cargo fmt --all -- --check && cargo doc --workspace --all-features --no-deps" - timeout_in_minutes: 15 - plugins: - - docker-compose#v3.0.0: - run: unit-test - config: - - .buildkite/docker/docker-compose.yaml - - .buildkite/docker/docker-compose-ci.yaml - env: - - RUSTDOCFLAGS=-Dwarnings - - label: "lint" - agents: - queue: "default" - docker: "*" - command: "cargo lint && cargo test-lint" - timeout_in_minutes: 15 - plugins: - - docker-compose#v3.0.0: - run: unit-test - config: - - .buildkite/docker/docker-compose.yaml - - .buildkite/docker/docker-compose-ci.yaml - - label: "test" - agents: - queue: "default" - docker: "*" - command: "cargo test -- --include-ignored" - artifact_paths: - - "tarpaulin-report.html" - - "machine_coverage/*" - timeout_in_minutes: 15 - plugins: - - docker-compose#v3.0.0: - run: unit-test - config: - - .buildkite/docker/docker-compose.yaml - - .buildkite/docker/docker-compose-ci.yaml - - label: "integ-test" - agents: - queue: "default" - docker: "*" - env: - INTEG_SERVER_TYPE: docker - command: "cargo test --test integ_tests" - timeout_in_minutes: 15 - plugins: - - docker-compose#v3.0.0: - run: integ-test - config: - - .buildkite/docker/docker-compose.yaml - - .buildkite/docker/docker-compose-ci.yaml - - wait diff --git a/.github/workflows/heavy.yml b/.github/workflows/heavy.yml index d5e2346e4..212094e7d 100644 --- a/.github/workflows/heavy.yml +++ b/.github/workflows/heavy.yml @@ -6,6 +6,10 @@ on: # rebuild any PRs and main branch changes branches: - master +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + jobs: build-and-test: timeout-minutes: 20 diff --git a/.github/workflows/per-pr.yml b/.github/workflows/per-pr.yml new file mode 100644 index 000000000..1041e8773 --- /dev/null +++ b/.github/workflows/per-pr.yml @@ -0,0 +1,96 @@ +name: Per merge CI + +on: # rebuild any PRs and main branch changes + pull_request: + push: + branches: + - master + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +jobs: + build-and-test: + name: "Format, docs, and lint" + timeout-minutes: 20 + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + submodules: recursive + - uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: 1.74.0 + override: true + - name: Install protoc + uses: arduino/setup-protoc@v1 + with: + version: '3.x' + repo-token: ${{ secrets.GITHUB_TOKEN }} + - run: rustup component add rustfmt clippy + - uses: Swatinem/rust-cache@v2 + - uses: actions-rs/cargo@v1 + with: + command: fmt + args: --all -- --check + - uses: actions-rs/cargo@v1 + with: + command: doc + args: --workspace --all-features --no-deps + - uses: actions-rs/cargo@v1 + with: + command: lint + - uses: actions-rs/cargo@v1 + with: + command: test-lint + + test: + name: Unit Tests + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: 1.74.0 + override: true + - name: Install protoc + uses: arduino/setup-protoc@v1 + with: + version: '3.x' + repo-token: ${{ secrets.GITHUB_TOKEN }} + - uses: Swatinem/rust-cache@v2 + - uses: actions-rs/cargo@v1 + with: + command: test + args: -- --include-ignored + - uses: actions/upload-artifact@v3 + with: + name: coverage-report + path: tarpaulin-report.html + - uses: actions/upload-artifact@v3 + with: + name: state-machine-coverage + path: machine_coverage/ + + fmt: + name: Integ tests + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: 1.74.0 + override: true + - name: Install protoc + uses: arduino/setup-protoc@v1 + with: + version: '3.x' + repo-token: ${{ secrets.GITHUB_TOKEN }} + - uses: Swatinem/rust-cache@v2 + - uses: actions-rs/cargo@v1 + with: + command: integ-test \ No newline at end of file diff --git a/README.md b/README.md index 23d3265a7..03decad3c 100644 --- a/README.md +++ b/README.md @@ -48,8 +48,8 @@ You can build and test the project using cargo: `cargo build` `cargo test` -Run integ tests with `cargo integ-test`. You will need to already be running the server: -`docker-compose -f .buildkite/docker/docker-compose.yaml up` +Run integ tests with `cargo integ-test`. By default it will start an ephemeral server. You can also +use an already-running server by passing `-s external`. Run load tests with `cargo test --features=save_wf_inputs --test heavy_tests`. @@ -79,7 +79,7 @@ The passed in options to initialization can be customized to export to an OTel c To run integ tests with OTel collection on, you can use `integ-with-otel.sh`. You will want to make sure you are running the collector via docker, which can be done like so: -`docker-compose -f .buildkite/docker/docker-compose.yaml -f .buildkite/docker/docker-compose-telem.yaml up` +`docker-compose -f docker/docker-compose.yaml -f docker/docker-compose-telem.yaml up` If you are working on a language SDK, you are expected to initialize tracing early in your `main` equivalent. diff --git a/core-api/src/lib.rs b/core-api/src/lib.rs index 47533e9c8..1ef00319f 100644 --- a/core-api/src/lib.rs +++ b/core-api/src/lib.rs @@ -24,25 +24,17 @@ pub trait Worker: Send + Sync { /// be one outstanding activation for a particular run of a workflow at any time. If an /// activation is not responded to, it will cause that workflow to become stuck forever. /// - /// Activations that contain only a `remove_from_cache` job should not cause the workflow code - /// to be invoked and may be responded to with an empty command list. Eviction jobs may also - /// appear with other jobs, but will always appear last in the job list. In this case it is - /// expected that the workflow code will be invoked, and the response produced as normal, but - /// the caller should evict the run after doing so. + /// See [WorkflowActivation] for more details on the expected behavior of lang w.r.t activation + /// & job processing. /// - /// It is rarely a good idea to call poll concurrently. It handles polling the server - /// concurrently internally. + /// Do not call poll concurrently. It handles polling the server concurrently internally. async fn poll_workflow_activation(&self) -> Result; /// Ask the worker for some work, returning an [ActivityTask]. It is then the language SDK's /// responsibility to call the appropriate activity code with the provided inputs. Blocks /// indefinitely until such work is available or [Worker::shutdown] is called. /// - /// The returned activation is guaranteed to be for the same task queue / worker which was - /// provided as the `task_queue` argument. - /// - /// It is rarely a good idea to call poll concurrently. It handles polling the server - /// concurrently internally. + /// Do not call poll concurrently. It handles polling the server concurrently internally. async fn poll_activity_task(&self) -> Result; /// Tell the worker that a workflow activation has completed. May (and should) be freely called diff --git a/core/src/core_tests/updates.rs b/core/src/core_tests/updates.rs index b186916e5..bf0cc72bc 100644 --- a/core/src/core_tests/updates.rs +++ b/core/src/core_tests/updates.rs @@ -32,11 +32,11 @@ async fn replay_with_empty_first_task() { task.jobs.as_slice(), [ WorkflowActivationJob { - variant: Some(workflow_activation_job::Variant::StartWorkflow(_)), + variant: Some(workflow_activation_job::Variant::DoUpdate(_)), }, WorkflowActivationJob { - variant: Some(workflow_activation_job::Variant::DoUpdate(_)), - } + variant: Some(workflow_activation_job::Variant::StartWorkflow(_)), + }, ] ); core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( diff --git a/core/src/worker/workflow/managed_run.rs b/core/src/worker/workflow/managed_run.rs index 2fba859ee..2737d718c 100644 --- a/core/src/worker/workflow/managed_run.rs +++ b/core/src/worker/workflow/managed_run.rs @@ -732,7 +732,7 @@ impl ManagedRun { } else { None }; - self.update_to_acts(Ok(maybe_act).map(Into::into)) + self.update_to_acts(Ok(maybe_act.into())) } /// Returns `true` if autocompletion should be issued, which will actually cause us to end up /// in [completion] again, at which point we'll start a new heartbeat timeout, which will diff --git a/core/src/worker/workflow/mod.rs b/core/src/worker/workflow/mod.rs index 32ac809f7..0f8e838c6 100644 --- a/core/src/worker/workflow/mod.rs +++ b/core/src/worker/workflow/mod.rs @@ -1392,7 +1392,7 @@ impl LocalActivityRequestSink for LAReqSink { /// activations must uphold. /// /// ## Ordering -/// `patches -> signals -> other -X-> queries` +/// `patches -> signals/updates -> other -> queries -> evictions` /// /// ## Invariants: /// * Queries always go in their own activation @@ -1426,10 +1426,14 @@ fn prepare_to_ship_activation(wfa: &mut WorkflowActivation) { match v { workflow_activation_job::Variant::NotifyHasPatch(_) => 1, workflow_activation_job::Variant::SignalWorkflow(_) => 2, + workflow_activation_job::Variant::DoUpdate(_) => 2, // In principle we should never actually need to sort these with the others, since // queries always get their own activation, but, maintaining the semantic is // reasonable. workflow_activation_job::Variant::QueryWorkflow(_) => 4, + // Also shouldn't ever end up anywhere but the end by construction, but no harm in + // double-checking. + workflow_activation_job::Variant::RemoveFromCache(_) => 5, _ => 3, } } @@ -1441,6 +1445,7 @@ fn prepare_to_ship_activation(wfa: &mut WorkflowActivation) { mod tests { use super::*; use itertools::Itertools; + use temporal_sdk_core_protos::coresdk::workflow_activation::SignalWorkflow; #[test] fn jobs_sort() { @@ -1448,7 +1453,10 @@ mod tests { jobs: vec![ WorkflowActivationJob { variant: Some(workflow_activation_job::Variant::SignalWorkflow( - Default::default(), + SignalWorkflow { + signal_name: "1".to_string(), + ..Default::default() + }, )), }, WorkflowActivationJob { @@ -1466,6 +1474,19 @@ mod tests { Default::default(), )), }, + WorkflowActivationJob { + variant: Some(workflow_activation_job::Variant::DoUpdate( + Default::default(), + )), + }, + WorkflowActivationJob { + variant: Some(workflow_activation_job::Variant::SignalWorkflow( + SignalWorkflow { + signal_name: "2".to_string(), + ..Default::default() + }, + )), + }, ], ..Default::default() }; @@ -1479,10 +1500,12 @@ mod tests { variants.as_slice(), &[ workflow_activation_job::Variant::NotifyHasPatch(_), - workflow_activation_job::Variant::SignalWorkflow(_), + workflow_activation_job::Variant::SignalWorkflow(ref s1), + workflow_activation_job::Variant::DoUpdate(_), + workflow_activation_job::Variant::SignalWorkflow(ref s2), workflow_activation_job::Variant::FireTimer(_), workflow_activation_job::Variant::ResolveActivity(_), - ] + ] if s1.signal_name == "1" && s2.signal_name == "2" ) } diff --git a/.buildkite/docker/docker-compose-telem.yaml b/docker/docker-compose-telem.yaml similarity index 100% rename from .buildkite/docker/docker-compose-telem.yaml rename to docker/docker-compose-telem.yaml diff --git a/.buildkite/docker/docker-compose.yaml b/docker/docker-compose.yaml similarity index 100% rename from .buildkite/docker/docker-compose.yaml rename to docker/docker-compose.yaml diff --git a/fsm/rustfsm_procmacro/tests/trybuild/no_handle_conversions_require_into_fail.stderr b/fsm/rustfsm_procmacro/tests/trybuild/no_handle_conversions_require_into_fail.stderr index 2480f5513..35e804e86 100644 --- a/fsm/rustfsm_procmacro/tests/trybuild/no_handle_conversions_require_into_fail.stderr +++ b/fsm/rustfsm_procmacro/tests/trybuild/no_handle_conversions_require_into_fail.stderr @@ -1,8 +1,8 @@ error[E0277]: the trait bound `One: From` is not satisfied - --> tests/trybuild/no_handle_conversions_require_into_fail.rs:11:18 + --> tests/trybuild/no_handle_conversions_require_into_fail.rs:11:5 | 11 | Two --(B)--> One; - | ^^^ the trait `From` is not implemented for `One` + | ^^^ the trait `From` is not implemented for `One` | = note: required for `Two` to implement `Into` note: required by a bound in `TransitionResult::::from` diff --git a/sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto b/sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto index 70718729f..2c612b8b8 100644 --- a/sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto +++ b/sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto @@ -19,6 +19,33 @@ import "temporal/sdk/core/common/common.proto"; // An instruction to the lang sdk to run some workflow code, whether for the first time or from // a cached state. +// +// ## Job ordering guarantees and semantics +// +// Core will, by default, order jobs within the activation as follows: +// `patches -> signals/updates -> other -> queries -> evictions` +// +// This is because: +// * Patches are expected to apply to the entire activation +// * Signal and update handlers should be invoked before workflow routines are iterated. That is to +// say before the users' main workflow function and anything spawned by it is allowed to continue. +// * Queries always go last (and, in fact, always come in their own activation) +// +// The downside of this reordering is that a signal or update handler may not observe that some +// other event had already happened (ex: an activity completed) when it is first invoked, though it +// will subsequently when workflow routines are driven. Core only does this reordering to make life +// easier for languages that cannot explicitly control when workflow routines are iterated. +// Languages that can explicitly control such iteration should prefer to apply all the jobs to state +// (by resolving promises/futures, invoking handlers, etc as they iterate over the jobs) and then +// only *after* that is done, drive the workflow routines. +// +// ## Evictions +// +// Activations that contain only a `remove_from_cache` job should not cause the workflow code +// to be invoked and may be responded to with an empty command list. Eviction jobs may also +// appear with other jobs, but will always appear last in the job list. In this case it is +// expected that the workflow code will be invoked, and the response produced as normal, but +// the caller should evict the run after doing so. message WorkflowActivation { // The id of the currently active run of the workflow. Also used as a cache key. There may // only ever be one active workflow task (and hence activation) of a run at one time. diff --git a/sdk-core-protos/src/lib.rs b/sdk-core-protos/src/lib.rs index a0caccf46..8d2e4d0f7 100644 --- a/sdk-core-protos/src/lib.rs +++ b/sdk-core-protos/src/lib.rs @@ -24,7 +24,11 @@ pub static ENCODING_PAYLOAD_KEY: &str = "encoding"; pub static JSON_ENCODING_VAL: &str = "json/plain"; pub static PATCHED_MARKER_DETAILS_KEY: &str = "patch-data"; -#[allow(clippy::large_enum_variant, clippy::derive_partial_eq_without_eq)] +#[allow( + clippy::large_enum_variant, + clippy::derive_partial_eq_without_eq, + clippy::reserve_after_initialization +)] // I'd prefer not to do this, but there are some generated things that just don't need it. #[allow(missing_docs)] pub mod coresdk { diff --git a/sdk/src/workflow_future.rs b/sdk/src/workflow_future.rs index 06e91e0da..ed1a85dc6 100644 --- a/sdk/src/workflow_future.rs +++ b/sdk/src/workflow_future.rs @@ -333,11 +333,11 @@ impl Future for WorkflowFuture { // the history has no commands until an update is accepted. Will go away w/ SDK redesign if activation .jobs - .get(0) - .is_some_and(|j| matches!(j.variant, Some(Variant::StartWorkflow(_)))) - && activation.jobs.iter().all(|a| { + .iter() + .any(|j| matches!(j.variant, Some(Variant::StartWorkflow(_)))) + && activation.jobs.iter().all(|j| { matches!( - a.variant, + j.variant, Some(Variant::StartWorkflow(_) | Variant::DoUpdate(_)) ) }) diff --git a/tests/integ_tests/update_tests.rs b/tests/integ_tests/update_tests.rs index 26254d40e..72dd450be 100644 --- a/tests/integ_tests/update_tests.rs +++ b/tests/integ_tests/update_tests.rs @@ -89,10 +89,10 @@ async fn update_workflow(#[values(true, false)] will_fail: bool) { async fn _do_update_workflow(will_fail: bool, core: &dyn Worker) { let res = core.poll_workflow_activation().await.unwrap(); - // On replay, the first activation has start workflow & update, but on first execution, it does + // On replay, the first activation has update & start workflow, but on first execution, it does // not - can happen if update is waiting on some condition. let pid = assert_matches!( - res.jobs.last().unwrap(), + &res.jobs[0], WorkflowActivationJob { variant: Some(workflow_activation_job::Variant::DoUpdate(d)), } => &d.protocol_instance_id