Skip to content

Commit

Permalink
Order updates with signals (#639)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource authored Nov 28, 2023
1 parent 78c644a commit 0d746ba
Show file tree
Hide file tree
Showing 18 changed files with 178 additions and 130 deletions.
9 changes: 0 additions & 9 deletions .buildkite/docker/Dockerfile

This file was deleted.

5 changes: 0 additions & 5 deletions .buildkite/docker/build.sh

This file was deleted.

27 changes: 0 additions & 27 deletions .buildkite/docker/docker-compose-ci.yaml

This file was deleted.

57 changes: 0 additions & 57 deletions .buildkite/pipeline.yml

This file was deleted.

4 changes: 4 additions & 0 deletions .github/workflows/heavy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ on: # rebuild any PRs and main branch changes
branches:
- master

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
build-and-test:
timeout-minutes: 20
Expand Down
96 changes: 96 additions & 0 deletions .github/workflows/per-pr.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
name: Per merge CI

on: # rebuild any PRs and main branch changes
pull_request:
push:
branches:
- master

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
build-and-test:
name: "Format, docs, and lint"
timeout-minutes: 20
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
submodules: recursive
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: 1.74.0
override: true
- name: Install protoc
uses: arduino/setup-protoc@v1
with:
version: '3.x'
repo-token: ${{ secrets.GITHUB_TOKEN }}
- run: rustup component add rustfmt clippy
- uses: Swatinem/rust-cache@v2
- uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check
- uses: actions-rs/cargo@v1
with:
command: doc
args: --workspace --all-features --no-deps
- uses: actions-rs/cargo@v1
with:
command: lint
- uses: actions-rs/cargo@v1
with:
command: test-lint

test:
name: Unit Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: 1.74.0
override: true
- name: Install protoc
uses: arduino/setup-protoc@v1
with:
version: '3.x'
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: Swatinem/rust-cache@v2
- uses: actions-rs/cargo@v1
with:
command: test
args: -- --include-ignored
- uses: actions/upload-artifact@v3
with:
name: coverage-report
path: tarpaulin-report.html
- uses: actions/upload-artifact@v3
with:
name: state-machine-coverage
path: machine_coverage/

fmt:
name: Integ tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: 1.74.0
override: true
- name: Install protoc
uses: arduino/setup-protoc@v1
with:
version: '3.x'
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: Swatinem/rust-cache@v2
- uses: actions-rs/cargo@v1
with:
command: integ-test
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ You can build and test the project using cargo:
`cargo build`
`cargo test`

Run integ tests with `cargo integ-test`. You will need to already be running the server:
`docker-compose -f .buildkite/docker/docker-compose.yaml up`
Run integ tests with `cargo integ-test`. By default it will start an ephemeral server. You can also
use an already-running server by passing `-s external`.

Run load tests with `cargo test --features=save_wf_inputs --test heavy_tests`.

Expand Down Expand Up @@ -79,7 +79,7 @@ The passed in options to initialization can be customized to export to an OTel c
To run integ tests with OTel collection on, you can use `integ-with-otel.sh`. You will want to make
sure you are running the collector via docker, which can be done like so:

`docker-compose -f .buildkite/docker/docker-compose.yaml -f .buildkite/docker/docker-compose-telem.yaml up`
`docker-compose -f docker/docker-compose.yaml -f docker/docker-compose-telem.yaml up`

If you are working on a language SDK, you are expected to initialize tracing early in your `main`
equivalent.
Expand Down
16 changes: 4 additions & 12 deletions core-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,17 @@ pub trait Worker: Send + Sync {
/// be one outstanding activation for a particular run of a workflow at any time. If an
/// activation is not responded to, it will cause that workflow to become stuck forever.
///
/// Activations that contain only a `remove_from_cache` job should not cause the workflow code
/// to be invoked and may be responded to with an empty command list. Eviction jobs may also
/// appear with other jobs, but will always appear last in the job list. In this case it is
/// expected that the workflow code will be invoked, and the response produced as normal, but
/// the caller should evict the run after doing so.
/// See [WorkflowActivation] for more details on the expected behavior of lang w.r.t activation
/// & job processing.
///
/// It is rarely a good idea to call poll concurrently. It handles polling the server
/// concurrently internally.
/// Do not call poll concurrently. It handles polling the server concurrently internally.
async fn poll_workflow_activation(&self) -> Result<WorkflowActivation, PollWfError>;

/// Ask the worker for some work, returning an [ActivityTask]. It is then the language SDK's
/// responsibility to call the appropriate activity code with the provided inputs. Blocks
/// indefinitely until such work is available or [Worker::shutdown] is called.
///
/// The returned activation is guaranteed to be for the same task queue / worker which was
/// provided as the `task_queue` argument.
///
/// It is rarely a good idea to call poll concurrently. It handles polling the server
/// concurrently internally.
/// Do not call poll concurrently. It handles polling the server concurrently internally.
async fn poll_activity_task(&self) -> Result<ActivityTask, PollActivityError>;

/// Tell the worker that a workflow activation has completed. May (and should) be freely called
Expand Down
6 changes: 3 additions & 3 deletions core/src/core_tests/updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ async fn replay_with_empty_first_task() {
task.jobs.as_slice(),
[
WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::StartWorkflow(_)),
variant: Some(workflow_activation_job::Variant::DoUpdate(_)),
},
WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::DoUpdate(_)),
}
variant: Some(workflow_activation_job::Variant::StartWorkflow(_)),
},
]
);
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
Expand Down
2 changes: 1 addition & 1 deletion core/src/worker/workflow/managed_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ impl ManagedRun {
} else {
None
};
self.update_to_acts(Ok(maybe_act).map(Into::into))
self.update_to_acts(Ok(maybe_act.into()))
}
/// Returns `true` if autocompletion should be issued, which will actually cause us to end up
/// in [completion] again, at which point we'll start a new heartbeat timeout, which will
Expand Down
31 changes: 27 additions & 4 deletions core/src/worker/workflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1392,7 +1392,7 @@ impl LocalActivityRequestSink for LAReqSink {
/// activations must uphold.
///
/// ## Ordering
/// `patches -> signals -> other -X-> queries`
/// `patches -> signals/updates -> other -> queries -> evictions`
///
/// ## Invariants:
/// * Queries always go in their own activation
Expand Down Expand Up @@ -1426,10 +1426,14 @@ fn prepare_to_ship_activation(wfa: &mut WorkflowActivation) {
match v {
workflow_activation_job::Variant::NotifyHasPatch(_) => 1,
workflow_activation_job::Variant::SignalWorkflow(_) => 2,
workflow_activation_job::Variant::DoUpdate(_) => 2,
// In principle we should never actually need to sort these with the others, since
// queries always get their own activation, but, maintaining the semantic is
// reasonable.
workflow_activation_job::Variant::QueryWorkflow(_) => 4,
// Also shouldn't ever end up anywhere but the end by construction, but no harm in
// double-checking.
workflow_activation_job::Variant::RemoveFromCache(_) => 5,
_ => 3,
}
}
Expand All @@ -1441,14 +1445,18 @@ fn prepare_to_ship_activation(wfa: &mut WorkflowActivation) {
mod tests {
use super::*;
use itertools::Itertools;
use temporal_sdk_core_protos::coresdk::workflow_activation::SignalWorkflow;

#[test]
fn jobs_sort() {
let mut act = WorkflowActivation {
jobs: vec![
WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::SignalWorkflow(
Default::default(),
SignalWorkflow {
signal_name: "1".to_string(),
..Default::default()
},
)),
},
WorkflowActivationJob {
Expand All @@ -1466,6 +1474,19 @@ mod tests {
Default::default(),
)),
},
WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::DoUpdate(
Default::default(),
)),
},
WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::SignalWorkflow(
SignalWorkflow {
signal_name: "2".to_string(),
..Default::default()
},
)),
},
],
..Default::default()
};
Expand All @@ -1479,10 +1500,12 @@ mod tests {
variants.as_slice(),
&[
workflow_activation_job::Variant::NotifyHasPatch(_),
workflow_activation_job::Variant::SignalWorkflow(_),
workflow_activation_job::Variant::SignalWorkflow(ref s1),
workflow_activation_job::Variant::DoUpdate(_),
workflow_activation_job::Variant::SignalWorkflow(ref s2),
workflow_activation_job::Variant::FireTimer(_),
workflow_activation_job::Variant::ResolveActivity(_),
]
] if s1.signal_name == "1" && s2.signal_name == "2"
)
}

Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
error[E0277]: the trait bound `One: From<Two>` is not satisfied
--> tests/trybuild/no_handle_conversions_require_into_fail.rs:11:18
--> tests/trybuild/no_handle_conversions_require_into_fail.rs:11:5
|
11 | Two --(B)--> One;
| ^^^ the trait `From<Two>` is not implemented for `One`
| ^^^ the trait `From<Two>` is not implemented for `One`
|
= note: required for `Two` to implement `Into<One>`
note: required by a bound in `TransitionResult::<Sm, Ds>::from`
Expand Down
Loading

0 comments on commit 0d746ba

Please sign in to comment.