Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Order updates with signals #639

Merged
merged 9 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions .buildkite/docker/Dockerfile

This file was deleted.

5 changes: 0 additions & 5 deletions .buildkite/docker/build.sh

This file was deleted.

27 changes: 0 additions & 27 deletions .buildkite/docker/docker-compose-ci.yaml

This file was deleted.

57 changes: 0 additions & 57 deletions .buildkite/pipeline.yml

This file was deleted.

4 changes: 4 additions & 0 deletions .github/workflows/heavy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ on: # rebuild any PRs and main branch changes
branches:
- master

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
build-and-test:
timeout-minutes: 20
Expand Down
96 changes: 96 additions & 0 deletions .github/workflows/per-pr.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
name: Per merge CI

on: # rebuild any PRs and main branch changes
pull_request:
push:
branches:
- master

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
build-and-test:
name: "Format, docs, and lint"
timeout-minutes: 20
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
submodules: recursive
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: 1.74.0
override: true
- name: Install protoc
uses: arduino/setup-protoc@v1
with:
version: '3.x'
repo-token: ${{ secrets.GITHUB_TOKEN }}
- run: rustup component add rustfmt clippy
- uses: Swatinem/rust-cache@v2
- uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check
- uses: actions-rs/cargo@v1
with:
command: doc
args: --workspace --all-features --no-deps
- uses: actions-rs/cargo@v1
with:
command: lint
- uses: actions-rs/cargo@v1
with:
command: test-lint

test:
name: Unit Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: 1.74.0
override: true
- name: Install protoc
uses: arduino/setup-protoc@v1
with:
version: '3.x'
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: Swatinem/rust-cache@v2
- uses: actions-rs/cargo@v1
with:
command: test
args: -- --include-ignored
- uses: actions/upload-artifact@v3
with:
name: coverage-report
path: tarpaulin-report.html
- uses: actions/upload-artifact@v3
with:
name: state-machine-coverage
path: machine_coverage/

fmt:
name: Integ tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: 1.74.0
override: true
- name: Install protoc
uses: arduino/setup-protoc@v1
with:
version: '3.x'
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: Swatinem/rust-cache@v2
- uses: actions-rs/cargo@v1
with:
command: integ-test
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ You can build and test the project using cargo:
`cargo build`
`cargo test`

Run integ tests with `cargo integ-test`. You will need to already be running the server:
`docker-compose -f .buildkite/docker/docker-compose.yaml up`
Run integ tests with `cargo integ-test`. By default it will start an ephemeral server. You can also
use an already-running server by passing `-s external`.

Run load tests with `cargo test --features=save_wf_inputs --test heavy_tests`.

Expand Down Expand Up @@ -79,7 +79,7 @@ The passed in options to initialization can be customized to export to an OTel c
To run integ tests with OTel collection on, you can use `integ-with-otel.sh`. You will want to make
sure you are running the collector via docker, which can be done like so:

`docker-compose -f .buildkite/docker/docker-compose.yaml -f .buildkite/docker/docker-compose-telem.yaml up`
`docker-compose -f docker/docker-compose.yaml -f docker/docker-compose-telem.yaml up`

If you are working on a language SDK, you are expected to initialize tracing early in your `main`
equivalent.
Expand Down
16 changes: 4 additions & 12 deletions core-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,17 @@ pub trait Worker: Send + Sync {
/// be one outstanding activation for a particular run of a workflow at any time. If an
/// activation is not responded to, it will cause that workflow to become stuck forever.
///
/// Activations that contain only a `remove_from_cache` job should not cause the workflow code
/// to be invoked and may be responded to with an empty command list. Eviction jobs may also
/// appear with other jobs, but will always appear last in the job list. In this case it is
/// expected that the workflow code will be invoked, and the response produced as normal, but
/// the caller should evict the run after doing so.
/// See [WorkflowActivation] for more details on the expected behavior of lang w.r.t activation
/// & job processing.
///
/// It is rarely a good idea to call poll concurrently. It handles polling the server
/// concurrently internally.
/// Do not call poll concurrently. It handles polling the server concurrently internally.
async fn poll_workflow_activation(&self) -> Result<WorkflowActivation, PollWfError>;

/// Ask the worker for some work, returning an [ActivityTask]. It is then the language SDK's
/// responsibility to call the appropriate activity code with the provided inputs. Blocks
/// indefinitely until such work is available or [Worker::shutdown] is called.
///
/// The returned activation is guaranteed to be for the same task queue / worker which was
/// provided as the `task_queue` argument.
///
/// It is rarely a good idea to call poll concurrently. It handles polling the server
/// concurrently internally.
/// Do not call poll concurrently. It handles polling the server concurrently internally.
async fn poll_activity_task(&self) -> Result<ActivityTask, PollActivityError>;

/// Tell the worker that a workflow activation has completed. May (and should) be freely called
Expand Down
6 changes: 3 additions & 3 deletions core/src/core_tests/updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ async fn replay_with_empty_first_task() {
task.jobs.as_slice(),
[
WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::StartWorkflow(_)),
variant: Some(workflow_activation_job::Variant::DoUpdate(_)),
},
WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::DoUpdate(_)),
}
variant: Some(workflow_activation_job::Variant::StartWorkflow(_)),
},
]
);
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
Expand Down
2 changes: 1 addition & 1 deletion core/src/worker/workflow/managed_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ impl ManagedRun {
} else {
None
};
self.update_to_acts(Ok(maybe_act).map(Into::into))
self.update_to_acts(Ok(maybe_act.into()))
}
/// Returns `true` if autocompletion should be issued, which will actually cause us to end up
/// in [completion] again, at which point we'll start a new heartbeat timeout, which will
Expand Down
31 changes: 27 additions & 4 deletions core/src/worker/workflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1392,7 +1392,7 @@ impl LocalActivityRequestSink for LAReqSink {
/// activations must uphold.
///
/// ## Ordering
/// `patches -> signals -> other -X-> queries`
/// `patches -> signals/updates -> other -> queries -> evictions`
///
/// ## Invariants:
/// * Queries always go in their own activation
Expand Down Expand Up @@ -1426,10 +1426,14 @@ fn prepare_to_ship_activation(wfa: &mut WorkflowActivation) {
match v {
workflow_activation_job::Variant::NotifyHasPatch(_) => 1,
workflow_activation_job::Variant::SignalWorkflow(_) => 2,
workflow_activation_job::Variant::DoUpdate(_) => 2,
// In principle we should never actually need to sort these with the others, since
// queries always get their own activation, but, maintaining the semantic is
// reasonable.
workflow_activation_job::Variant::QueryWorkflow(_) => 4,
// Also shouldn't ever end up anywhere but the end by construction, but no harm in
// double-checking.
workflow_activation_job::Variant::RemoveFromCache(_) => 5,
_ => 3,
}
}
Expand All @@ -1441,14 +1445,18 @@ fn prepare_to_ship_activation(wfa: &mut WorkflowActivation) {
mod tests {
use super::*;
use itertools::Itertools;
use temporal_sdk_core_protos::coresdk::workflow_activation::SignalWorkflow;

#[test]
fn jobs_sort() {
let mut act = WorkflowActivation {
jobs: vec![
WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::SignalWorkflow(
Default::default(),
SignalWorkflow {
signal_name: "1".to_string(),
..Default::default()
Comment on lines +1457 to +1458
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the application code also uses the signal_name "1", do we need to reserve some signal names?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm? This is just test code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry...Duh...

},
)),
},
WorkflowActivationJob {
Expand All @@ -1466,6 +1474,19 @@ mod tests {
Default::default(),
)),
},
WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::DoUpdate(
Default::default(),
)),
},
WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::SignalWorkflow(
SignalWorkflow {
signal_name: "2".to_string(),
..Default::default()
},
)),
},
],
..Default::default()
};
Expand All @@ -1479,10 +1500,12 @@ mod tests {
variants.as_slice(),
&[
workflow_activation_job::Variant::NotifyHasPatch(_),
workflow_activation_job::Variant::SignalWorkflow(_),
workflow_activation_job::Variant::SignalWorkflow(ref s1),
workflow_activation_job::Variant::DoUpdate(_),
workflow_activation_job::Variant::SignalWorkflow(ref s2),
workflow_activation_job::Variant::FireTimer(_),
workflow_activation_job::Variant::ResolveActivity(_),
]
] if s1.signal_name == "1" && s2.signal_name == "2"
)
}

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
error[E0277]: the trait bound `One: From<Two>` is not satisfied
--> tests/trybuild/no_handle_conversions_require_into_fail.rs:11:18
--> tests/trybuild/no_handle_conversions_require_into_fail.rs:11:5
|
11 | Two --(B)--> One;
| ^^^ the trait `From<Two>` is not implemented for `One`
| ^^^ the trait `From<Two>` is not implemented for `One`
|
= note: required for `Two` to implement `Into<One>`
note: required by a bound in `TransitionResult::<Sm, Ds>::from`
Expand Down
Loading
Loading