Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NOMERGE] Rust SDK future proposal draft #550

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
mod activity_context;
mod app_data;
pub mod interceptors;
pub mod new_activity_defs;

Check warning on line 50 in sdk/src/lib.rs

View workflow job for this annotation

GitHub Actions / build-and-test

missing documentation for a module

Check warning on line 50 in sdk/src/lib.rs

View workflow job for this annotation

GitHub Actions / build-and-test

missing documentation for a module

Check warning on line 50 in sdk/src/lib.rs

View workflow job for this annotation

GitHub Actions / build-and-test

missing documentation for a module

Check warning on line 50 in sdk/src/lib.rs

View workflow job for this annotation

GitHub Actions / Integ tests

missing documentation for a module

Check warning on line 50 in sdk/src/lib.rs

View workflow job for this annotation

GitHub Actions / Integ tests

missing documentation for a module

Check warning on line 50 in sdk/src/lib.rs

View workflow job for this annotation

GitHub Actions / Integ tests

missing documentation for a module

Check warning on line 50 in sdk/src/lib.rs

View workflow job for this annotation

GitHub Actions / Unit Tests

missing documentation for a module
mod payload_converter;
mod workflow_context;
mod workflow_future;
Expand Down
Empty file added sdk/src/new_activity_defs.rs
Empty file.
22 changes: 22 additions & 0 deletions workflow-api/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "temporal-workflow-api"
version = "0.1.0"
edition = "2021"
authors = ["Spencer Judge <[email protected]>"]
license-file = "LICENSE.txt"
description = "Temporal Rust SDK Worflow APIs"
homepage = "https://temporal.io/"
repository = "https://github.com/temporalio/sdk-core"
keywords = ["temporal", "workflow"]
categories = ["development-tools"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0"
futures = "0.3"

[dependencies.temporal-sdk-core-protos]
path = "../sdk-core-protos"
version = "0.1"

43 changes: 43 additions & 0 deletions workflow-api/src/activity_definitions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use crate::WfContext;
use futures::future::BoxFuture;

// #[activity_definition]
type MyActFn = fn(String) -> String;

// Macro enforces types are serializable.

// The biggest problem with activity definitions is they need to be defined in a crate which doesn't
// depend on the entire SDK, because then the workflow code which uses them wouldn't be able to
// be compiled down to WASM. Of course, the issue is activities _aren't_ compiled to WASM, and need
// access to full native functionality. Thus users need to structure their app a bit oddly. They
// can either define all their workflow code & activity _definitions_ in one crate, and then
// depend on that crate from another crate containing their activity implementations / worker, or
// they could make a crate with *just* activity definitions, which is depended on by the workflow
// implementation crate and the worker crate independently. It all makes perfect sense, but is
// maybe a bit annoying in terms of setup - though not really any worse than TS.

// Macro generates this extension & implementation:
//
// The generated code taking `impl Into<Argtype>` is quite nice for ergonomics inside the workflow,
// but might be impossible in some cases, so probably macro would need a flag to turn it off.
pub trait MyActFnWfCtxExt {
// In reality this returns the `CancellableFuture` type from SDK, would also need to move into
// this crate.
fn my_act_fn(
&self,
input: impl Into<String>,
) -> BoxFuture<'static, Result<String, ActivityFail>>;
}
impl MyActFnWfCtxExt for WfContext {
fn my_act_fn(&self, _: impl Into<String>) -> BoxFuture<'static, Result<String, ActivityFail>> {
// Type name is injected in this implementation, taken from macro
todo!()
}
}

// To implement the activity in their implementation crate, the user would do something like:
// worker.register_activity(MyActFn, |input: String| async move { .... });

// Placeholder. Activity failures as can be seen by the WF code.
#[derive(Debug)]
pub struct ActivityFail {}
205 changes: 205 additions & 0 deletions workflow-api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
//! This needs to be its own crate so that it doesn't pull in anything that would make compiling
//! to WASM not work. I've already figured out how to do all that once before with my WASM workflows
//! hackathon

mod activity_definitions;

use activity_definitions::MyActFnWfCtxExt;
use futures::future::BoxFuture;
use std::time::Duration;
use temporal_sdk_core_protos::{
coresdk::workflow_commands::ContinueAsNewWorkflowExecution, temporal::api::common::v1::Payload,
};

// anyhow errors are used for the errors returned by user-defined functions. This makes `?` work
// well everywhere by default which is a very nice property, as well as preserving backtraces. We
// may need to define our own error type instead to allow attaching things like the non-retryable
// flag... but I suspect we can just make downcasting work for that.

/// Workflow authors must implement this trait to create Temporal Rust workflows
pub trait Workflow: Sized {
/// Type of the input argument to the workflow
type Input: TemporalDeserializable;
/// Type of the output of the workflow
type Output: TemporalSerializable;
/// The workflow's name
const NAME: &'static str;

/// Called when an instance of a Workflow is first initialized.
///
/// `input` contains the input argument to the workflow as defined by the client who requested
/// the Workflow Execution.
fn new(input: Self::Input, ctx: SafeWfContext) -> Self;

/// Defines the actual workflow logic. The function must return a future, and this future is
/// cached and polled as updates to the workflow history are received.
///
/// `ctx` should be used to perform various Temporal commands like starting timers and
/// activities.
fn run(
&mut self,
ctx: WfContext,
) -> BoxFuture<Result<WfExitValue<Self::Output>, anyhow::Error>>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way we can avoid forcing anyhow on users here? Can we just have TemporalFailure enum here and some into() or whatever-similar-to-anyhow helper that's clear it converts to ApplicationFailure?

Copy link
Member Author

@Sushisource Sushisource May 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can certainly be our own thing yes, this is actually addressed briefly in a comment near the top


/// All signals this workflow can handle. Typically you won't implement this directly, it will
/// automatically contain all signals defined with the `#[signal]` attribute.
fn signals() -> &'static [&'static SignalDefinition<Self>] {
// TODO
&[]
}
/// All queries this workflow can handle. Typically you won't implement this directly, it will
/// automatically contain all queries defined with the `#[query]` attribute.
fn queries() -> &'static [&'static QueryDefinition<Self>] {
// TODO
&[]
}
}

/// TODO: Exists in SDK in slightly different form, and would move into this crate
#[derive(Debug)]
pub enum WfExitValue<T: TemporalSerializable> {
/// Continue the workflow as a new execution
ContinueAsNew(Box<ContinueAsNewWorkflowExecution>), // Wouldn't be raw proto in reality
/// Confirm the workflow was cancelled
Cancelled,
/// Finish with a result
Normal(T),
}
impl<T: TemporalSerializable> From<T> for WfExitValue<T> {
fn from(v: T) -> Self {
Self::Normal(v)
}
}
// ... also convenience functions for constructing C-A-N, etc.

/// A workflow context which contains only information, but does not allow any commands to
/// be created.
pub struct SafeWfContext {
// TODO
}

/// TODO: Placeholder, exists in SDK and would move into this crate & (likely) become a trait
pub struct WfContext {}
impl WfContext {
pub async fn timer(&self, _: Duration) {
todo!()
}
}

pub struct SignalDefinition<WF: Workflow> {
// TODO: Could be a matching predicate, to allow for dynamic registration
name: String,
// The handler input type must be erased here, since otherwise we couldn't store/return the
// heterogeneous collection of definition types in the workflow itself. The signal macro
// will wrap the user's function with code that performs deserialization.
handler: Box<dyn FnMut(&mut WF, Payload) -> Result<(), anyhow::Error>>,
}
pub struct QueryDefinition<WF: Workflow> {
// TODO: Could be a matching predicate, to allow for dynamic registration
name: String,
// The query macro will wrap the user's function with code that performs deserialization of
// input and serialization of output, as well as error boxing.
handler: Box<dyn FnMut(&WF, Payload) -> Result<Payload, anyhow::Error>>,
}

/// TODO: Placeholders, likely belong inside protos crate. These will be auto-implemented for
/// anything using serde already (which I expect is how virtually everyone will do this).
pub trait TemporalSerializable {}
impl<T> TemporalSerializable for T {}
pub trait TemporalDeserializable {}
impl<T> TemporalDeserializable for T {}

#[cfg(test)]
mod tests {
use super::*;
use futures::FutureExt;
use std::{collections::HashMap, marker::PhantomData};

// Workflow implementation example
struct MyWorkflow {
foo: u64,
bar: HashMap<String, u64>,
}

impl Workflow for MyWorkflow {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
impl Workflow for MyWorkflow {
#[workflow]
impl Workflow {

I wonder if there's value in not requiring trait impl and instead marking the run method with #[workflow_run] to extract input/output types. Only reason I was thinking this is because I kinda like that I can put my signal/query/update functions in the same impl block as my run stuff. But otherwise, I see how it hurts new() (unless you put the attribute there too).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

People are pretty used to multiple impl blocks in Rust. That said, I can get behind a procmacro like this too if it's providing value like auto-naming. It just needs to not do too much magic, because messing up IDEs etc is where things get really bad.

type Input = String;
type Output = u64;
const NAME: &'static str = "MyWorkflowType";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this would be better as a static getter, e.g. fn get_name() -> &str (or maybe String since we should only ask once anyways). If you do leave as const, might as well use the #[workflow] attribute.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it probably should be, if you need to dynamically construct workflow definitions for whatever reason


fn new(input: Self::Input, _ctx: SafeWfContext) -> Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn new(input: Self::Input, _ctx: SafeWfContext) -> Self {
fn new(input: Self::Input, _ctx: PureWorkflowContext) -> Self {

Two suggestions:

  • No more abbreviations (boo "Wf")
  • Something besides safe because I fear it's too generic of an adjective. I don't think "Pure" is that good either tbh, I'd have to think on it. But it's basically "CommandsDisallowedWorkflowContext".

let mut bar = HashMap::new();
bar.insert(input, 10);
Self { foo: 0, bar }
}

fn run(
&mut self,
ctx: WfContext,
Copy link
Member

@cretz cretz May 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you do get around to proposal time, I think it'd be worth a discussion of the tradeoff of making this statically accessible via async-local or similar. What I've found in the last two SDKs is that I can just grab the current coroutine scheduler.

Calls like https://docs.rs/tokio/latest/tokio/task/fn.spawn.html#panics are ok assuming the runtime or panicking, I wonder if we can be too. But also I could understand from a function-grouping perspective that having it on a context may be clearer.

One benefit of not having a manually propagated context is that users aren't tempted to store it to call from a query handler or something.

Hrmm, now I wonder if we can have the trait have fn context(&mut self) -> WorkflowContext and fn safe_context(&self) -> SafeWorkflowContext on it. But that doesn't help the new call though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. I would be OK with it I think. This feels somewhat more obvious to me, and I like that you don't have these error paths for "called in some context where it doesn't make sense".

) -> BoxFuture<Result<WfExitValue<Self::Output>, anyhow::Error>> {
async move {
ctx.timer(Duration::from_secs(1)).await;
self.foo = 1;
// See activity definitions file
ctx.my_act_fn("Hi!").await.unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concern here is how to pass in options. One reason that Java's approach of reusing actual activity signature was so rough is that you were forced to make options a separate thing.

// The into() is unfortunately unavoidable without making C-A-N and confirm cancel
// be errors instead. Personally, I don't love that and I think it's not idiomatic
// Rust, whereas needing to `into()` something is. Other way would be macros, but
// it's slightly too much magic I think.
Comment on lines +144 to +147
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concerning the into(), I wonder if maybe instead of Result<WfExitValue<Self::Output>, anyhow::Error> you have Result<Self::Output, TemporalNonSuccess> of some form. That way Ok only represents literal success, not cancellation or continue as new.

I think it's worth optimizing for the normal/happy path of Ok being the literal result and all else being the Err.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is CAN is a form of success. The semantics of it are my real issue. TemporalNonSuccess as a name is a perfect example. CAN isn't "NonSuccess".

Now, to be fair, currently I have WillRespondAsync as an error in activities. My rationale is that's less common than CAN. I get optimizing for the happy path, but into is easy and the compiler error will make it super obvious if the user hasn't done it.

I could live with it, for sure, but it just doesn't feel correct to me or particularly idiomatic for Rust

Copy link
Member

@cretz cretz May 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since continue as new has to be invoked top level to use this type anyways, I wonder if you can make it a macro that returns (so they don't have to ? or Err it), e.g. continue_as_new!(). That can kinda hide it. Otherwise, I agree that continue as new and cancel aren't technically errors, it's just a bit rough on the simple path. I can see both arguments here.

Ok(self.foo.into())
}
.boxed()
// TODO: The need to box here is slightly unfortunate, but it's either that or require
// users to depend on `async_trait` (which just hides the same thing). IMO this is the
// best option until more language features stabilize and this can go away.
Comment on lines +151 to +153
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you just require run be an async fn?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah, that's the thing that can't work in a trait yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May make it worth doing it attribute style. PyO3 are heavy users of this approach over trait-based (e.g. https://pyo3.rs/v0.18.3/class and https://pyo3.rs/v0.18.3/function).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's just async trait at that point, but if it ends up being useful for other reasons I'm definitely OK with a procmacro

Copy link
Member

@cretz cretz May 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like:

#[workflow]
struct MyWorkflow {}

impl MyWorkflow {
    #[workflow_run]
    async fn run(&mut self, some_arg: String) -> TemporalResult<String> {
        // ...
    }
    
    #[workflow_signal]
    async fn my_signal(&mut self, some_arg) {
        // ...
    }
}

Where it all gets interesting is what the caller side looks like in a type safe way which is why I know that traits requiring defined in/out types are helpful.

Copy link
Member

@cretz cretz May 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(and a #[workflow_init] or #[workflow_new] or whatever named attribute on a func that accepts workflow args to create the struct (required if struct cannot be "default" created)...we built this attribute for .NET constructors to allow for readonly field initialization)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The #[workflow] attribute would just go over the impl block I think. There's no need for it on the struct definition, and that allows you to just require a new and a run method without them needing separate attributes.

Copy link
Member

@cretz cretz May 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. May be worth exploring at proposal PR time if there aren't too many drawbacks. If caught between two impls that both may kinda work, just list pros/cons of each and then start one of our classic design Zoom calls to pick a winner :-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mega necropost

Preparing for pres at offsite... I'm now pretty convinced #[workflow] attribute macro is the best way to go for sure. Fixes async problem, makes signals/queries automatically work, enforces new & run... It's the way to go. People who want to could still do it all by hand if they hate it for whatever reason

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it's so pretty!

}
}

// #[workflow] miiiight be necessary here, but, ideally is not.
impl MyWorkflow {
// Attrib commented out since it's nonexistent for now, but that's what it'd look like.
// #[signal]
pub fn my_signal(&mut self, arg: String) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I have the context here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep

self.bar.insert(arg, 1);
}
// #[query]
pub fn my_query(&self, arg: String) -> Option<u64> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I have the "safe" context here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also yep

self.bar.get(&arg).cloned()
}
}

// This would need to be moved into this crate and depended on by client
struct WorkflowHandle<WF: Workflow> {
_d: PhantomData<WF>,
}
struct SignalError; // just a placeholder
struct QueryError; // just a placeholder

// The signal/query macros would generate this trait and impl:
trait MyWorkflowClientExtension {
fn my_signal(&self, arg: String) -> BoxFuture<Result<(), SignalError>>;
fn my_query(&self, arg: String) -> BoxFuture<Result<Option<u64>, QueryError>>;
}
impl MyWorkflowClientExtension for WorkflowHandle<MyWorkflow> {
fn my_signal(&self, arg: String) -> BoxFuture<Result<(), SignalError>> {
// Becomes something like:
// self.signal("my_signal", arg.serialize())
todo!()
}

fn my_query(&self, arg: String) -> BoxFuture<Result<Option<u64>, QueryError>> {
todo!()
}
}

async fn client_example() {
// Now you can use the client like:
// (actually comes from client.start() or client.get_handle() etc)
let wfh = WorkflowHandle {
_d: PhantomData::<MyWorkflow>,
};
let _ = wfh.my_signal("hi!".to_string()).await;
}

#[test]
fn compile() {}
}
Loading