From e7ef96a09342dd88465d99bca7a8322d34943f84 Mon Sep 17 00:00:00 2001 From: Jahvon Dockery Date: Tue, 24 Dec 2024 16:26:41 -0500 Subject: [PATCH] feat: add 'if' conditional processor to serial and parallel execs (#193) --- cmd/internal/store.go | 17 ++- docs/_sidebar.md | 2 + docs/cli/README.md | 2 +- docs/cli/flow_store.md | 12 +- docs/cli/flow_store_clear.md | 4 +- docs/cli/flow_store_get.md | 4 +- docs/cli/flow_store_set.md | 4 +- docs/guide/_sidebar.md | 3 + docs/guide/conditional.md | 126 ++++++++++++++++ docs/guide/state.md | 173 ++++++++++++++++++++++ docs/schemas/flowfile_schema.json | 10 ++ docs/types/flowfile.md | 2 + examples/wip.flow | 23 +++ go.mod | 1 + go.sum | 2 + internal/runner/parallel/parallel.go | 38 ++++- internal/runner/parallel/parallel_test.go | 17 +++ internal/runner/serial/serial.go | 64 ++++++-- internal/runner/serial/serial_test.go | 16 ++ internal/services/expr/expr.go | 97 ++++++++++++ internal/services/expr/expr_test.go | 135 +++++++++++++++++ tests/store_test.go | 2 +- types/executable/executable.gen.go | 32 ++++ types/executable/executable_md.go | 2 +- types/executable/executable_schema.yaml | 26 ++++ 25 files changed, 781 insertions(+), 33 deletions(-) create mode 100644 docs/guide/conditional.md create mode 100644 docs/guide/state.md create mode 100644 internal/services/expr/expr.go create mode 100644 internal/services/expr/expr_test.go diff --git a/cmd/internal/store.go b/cmd/internal/store.go index 890fd71..09fb5bf 100644 --- a/cmd/internal/store.go +++ b/cmd/internal/store.go @@ -16,8 +16,11 @@ import ( func RegisterStoreCmd(ctx *context.Context, rootCmd *cobra.Command) { subCmd := &cobra.Command{ Use: "store", - Short: "Manage the data store.", - Args: cobra.NoArgs, + Short: "Manage the data store for persisting key-value data.", + Long: "Manage the flow data store - a key-value store that persists data within and across executable runs. " + + "Values set outside executables persist globally, while values set within executables persist only for " + + "that execution scope.", + Args: cobra.NoArgs, } registerStoreSetCmd(ctx, subCmd) registerStoreGetCmd(ctx, subCmd) @@ -28,7 +31,7 @@ func RegisterStoreCmd(ctx *context.Context, rootCmd *cobra.Command) { func registerStoreSetCmd(ctx *context.Context, rootCmd *cobra.Command) { subCmd := &cobra.Command{ Use: "set KEY [VALUE]", - Short: "Set a key-value pair in the data store.", + Short: "Set a key-value pair in the store.", Long: dataStoreDescription + "This will overwrite any existing value for the key.", Args: cobra.MinimumNArgs(1), Run: func(cmd *cobra.Command, args []string) { @@ -89,7 +92,7 @@ func registerStoreGetCmd(ctx *context.Context, rootCmd *cobra.Command) { subCmd := &cobra.Command{ Use: "get KEY", Aliases: []string{"view"}, - Short: "Get a value from the data store.", + Short: "Get a value from the store by its key.", Long: dataStoreDescription + "This will retrieve the value for the given key.", Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, args []string) { @@ -125,7 +128,7 @@ func registerStoreClearCmd(ctx *context.Context, rootCmd *cobra.Command) { subCmd := &cobra.Command{ Use: "clear", Aliases: []string{"reset"}, - Short: "Clear the data store.", + Short: "Clear data from the store. Use --full to remove all stored data.", Long: dataStoreDescription + "This will remove all keys and values from the data store.", Args: cobra.NoArgs, Run: func(cmd *cobra.Command, args []string) { @@ -142,7 +145,7 @@ func storeClearFunc(ctx *context.Context, cmd *cobra.Command, _ []string) { if err := store.DestroyStore(); err != nil { ctx.Logger.FatalErr(err) } - ctx.Logger.PlainTextSuccess("Data store cleared") + ctx.Logger.PlainTextSuccess("Store store cleared") return } s, err := store.NewStore() @@ -157,7 +160,7 @@ func storeClearFunc(ctx *context.Context, cmd *cobra.Command, _ []string) { if err := s.DeleteBucket(store.EnvironmentBucket()); err != nil { ctx.Logger.FatalErr(err) } - ctx.Logger.PlainTextSuccess("Data store cleared") + ctx.Logger.PlainTextSuccess("Store store cleared") } var dataStoreDescription = "The data store is a key-value store that can be used to persist data across executions. " + diff --git a/docs/_sidebar.md b/docs/_sidebar.md index 8e9a126..e32bd87 100644 --- a/docs/_sidebar.md +++ b/docs/_sidebar.md @@ -9,6 +9,8 @@ - [Workspaces](guide/workspace.md "Managing workspaces") - [Executables](guide/executable.md "Managing executables") - [Templating](guide/templating.md "Using flowfile templates") + - [Managing state](guide/state.md "Managing executable state") + - [Conditional execution](guide/conditional.md "Conditional execution") - Reference diff --git a/docs/cli/README.md b/docs/cli/README.md index 4aee135..fbe05df 100644 --- a/docs/cli/README.md +++ b/docs/cli/README.md @@ -24,7 +24,7 @@ See github.com/jahvon/flow for more information. * [flow library](flow_library.md) - View and manage your library of workspaces and executables. * [flow logs](flow_logs.md) - List and view logs for previous flow executions. * [flow secret](flow_secret.md) - Manage flow secrets. -* [flow store](flow_store.md) - Manage the data store. +* [flow store](flow_store.md) - Manage the data store for persisting key-value data. * [flow sync](flow_sync.md) - Scan workspaces and update flow cache. * [flow template](flow_template.md) - Manage flowfile templates. * [flow workspace](flow_workspace.md) - Manage flow workspaces. diff --git a/docs/cli/flow_store.md b/docs/cli/flow_store.md index daaf7c7..4728bff 100644 --- a/docs/cli/flow_store.md +++ b/docs/cli/flow_store.md @@ -1,6 +1,10 @@ ## flow store -Manage the data store. +Manage the data store for persisting key-value data. + +### Synopsis + +Manage the flow data store - a key-value store that persists data within and across executable runs. Values set outside executables persist globally, while values set within executables persist only for that execution scope. ### Options @@ -19,7 +23,7 @@ Manage the data store. ### SEE ALSO * [flow](flow.md) - flow is a command line interface designed to make managing and running development workflows easier. -* [flow store clear](flow_store_clear.md) - Clear the data store. -* [flow store get](flow_store_get.md) - Get a value from the data store. -* [flow store set](flow_store_set.md) - Set a key-value pair in the data store. +* [flow store clear](flow_store_clear.md) - Clear data from the store. Use --full to remove all stored data. +* [flow store get](flow_store_get.md) - Get a value from the store by its key. +* [flow store set](flow_store_set.md) - Set a key-value pair in the store. diff --git a/docs/cli/flow_store_clear.md b/docs/cli/flow_store_clear.md index 653247d..2b2e37f 100644 --- a/docs/cli/flow_store_clear.md +++ b/docs/cli/flow_store_clear.md @@ -1,6 +1,6 @@ ## flow store clear -Clear the data store. +Clear data from the store. Use --full to remove all stored data. ### Synopsis @@ -29,5 +29,5 @@ flow store clear [flags] ### SEE ALSO -* [flow store](flow_store.md) - Manage the data store. +* [flow store](flow_store.md) - Manage the data store for persisting key-value data. diff --git a/docs/cli/flow_store_get.md b/docs/cli/flow_store_get.md index c9d0ec3..06f09fe 100644 --- a/docs/cli/flow_store_get.md +++ b/docs/cli/flow_store_get.md @@ -1,6 +1,6 @@ ## flow store get -Get a value from the data store. +Get a value from the store by its key. ### Synopsis @@ -28,5 +28,5 @@ flow store get KEY [flags] ### SEE ALSO -* [flow store](flow_store.md) - Manage the data store. +* [flow store](flow_store.md) - Manage the data store for persisting key-value data. diff --git a/docs/cli/flow_store_set.md b/docs/cli/flow_store_set.md index 8d942b8..dd041fb 100644 --- a/docs/cli/flow_store_set.md +++ b/docs/cli/flow_store_set.md @@ -1,6 +1,6 @@ ## flow store set -Set a key-value pair in the data store. +Set a key-value pair in the store. ### Synopsis @@ -28,5 +28,5 @@ flow store set KEY [VALUE] [flags] ### SEE ALSO -* [flow store](flow_store.md) - Manage the data store. +* [flow store](flow_store.md) - Manage the data store for persisting key-value data. diff --git a/docs/guide/_sidebar.md b/docs/guide/_sidebar.md index 5c39a70..f40e8b5 100644 --- a/docs/guide/_sidebar.md +++ b/docs/guide/_sidebar.md @@ -8,3 +8,6 @@ - [Workspaces](workspace.md "Managing workspaces") - [Executables](executable.md "Managing executables") - [Templating](templating.md "Using flowfile templates") + - [Managing state](guide/state.md "Managing executable state") + - [Conditional execution](guide/conditional.md "Conditional execution") + diff --git a/docs/guide/conditional.md b/docs/guide/conditional.md new file mode 100644 index 0000000..a088859 --- /dev/null +++ b/docs/guide/conditional.md @@ -0,0 +1,126 @@ +# Conditional Expressions + +flow CLI uses conditional expressions to control [executable](executable.md) behavior based on runtime conditions. These expressions are written +using a simple expression language that provides access to system information, environment variables, and stored data. + +## Expression Language + +Flow uses the [Expr](https://expr-lang.org) language for evaluating conditions. The language supports common +operators and functions while providing access to flow executable-specific context data. + +**See the [Expr language documentation](https://expr-lang.org/docs/language-definition) for more information on the +expression syntax.** + +### Basic Operators + +The expression language supports standard comparison and logical operators: + +- Comparison: `==`, `!=`, `<`, `>`, `<=`, `>=` +- Logical: `and`, `or`, `not` +- String: `+` (concatenation), `matches` (regex matching) +- Length: `len()` + +### Available Context + +When writing conditions, you have access to several context variables: + +- `os`: Operating system (e.g., "linux", "darwin", "windows") +- `arch`: System architecture (e.g., "amd64", "arm64") +- `ctx`: Flow context information + - `workspace`: Current workspace name + - `namespace`: Current namespace + - `workspacePath`: Path to current workspace + - `flowFilePath`: Path to current flow file + - `flowFileDir`: Directory containing current flow file +- `store`: Key-value map of data store contents +- `env`: Map of environment variables + +## Writing Conditions + +Conditions can be used in various places within flow, most commonly in the `if` field of executable configurations. Here are +some examples of common conditional patterns: + +### Operating System and Architecture Checks + +Check for specific operating systems or architectures: + +```yaml +executables: + - verb: install + name: system-specific + serial: + execs: + - if: os == "darwin" + cmd: brew install myapp + - if: os == "linux" + cmd: apt-get install myapp + - if: arch == "amd64" + cmd: make build-amd64 + - if: arch == "arm64" + cmd: make build-arm64 +``` + +### Environment Variable Checks + +Make decisions based on environment variables: + +```yaml +executables: + - verb: deploy + name: env-check + serial: + execs: + - if: env["ENVIRONMENT"] == "production" + cmd: echo "Deploying to production" + - if: env["DEBUG"] == "true" + cmd: echo "Debug mode enabled" +``` + +### Data Store Conditions + +Use stored data to control execution: + +```yaml +executables: + - verb: run + name: data-check + serial: + execs: + - cmd: flow store set feature-flag enabled + - if: data["feature-flag"] == "enabled" + cmd: echo "Feature is enabled" + - if: len(data["optional-key"]) > 0 + cmd: echo "Optional key exists" +``` + +### Complex Conditions + +Combine multiple conditions using logical operators: + +```yaml +executables: + - verb: build + name: complex-check + serial: + execs: + - if: os == "linux" and env["CI"] == "true" + cmd: echo "Running in Linux CI environment" + - if: len(data["build-id"]) > 0 and (os == "darwin" or os == "linux") + cmd: echo "Valid build on Unix-like system" +``` + +### Path and Location Checks + +Use context information to make path-based decisions: + +```yaml +executables: + - verb: setup + name: path-check + serial: + execs: + - if: ctx.workspace == "development" + cmd: echo "development workspace is active" + - if: ctx.flowFileDir matches ".*/scripts$" + cmd: echo "In scripts directory" +``` diff --git a/docs/guide/state.md b/docs/guide/state.md new file mode 100644 index 0000000..7b60858 --- /dev/null +++ b/docs/guide/state.md @@ -0,0 +1,173 @@ +# State Management + +flow provides several mechanisms for managing state across [executable](executable.md) runs. This guide explores how to use the data store +and temporary directories to maintain state, as well as how to make execution decisions based on that state. + +## Data Store + +The data store is a key-value store that persists data across executions. It provides a simple way to share information +between executables and maintain state between runs. + +### Store Persistence Scope + +Values in the data store have different persistence scopes depending on where they are set: + +- Values set outside an executable (using the CLI directly) persist across all executions until explicitly cleared +- Values set within an executable persist only across that executable's sub-executables (both serial and parallel) +- All values set within an executable are automatically cleared when the parent executable completes + +### Managing Store Data + +The data store can be managed at a global level in the CLI and within an executable's script. Here are the key operations: + +**Setting Values** + +```shell +# Direct CLI usage +flow store set KEY VALUE + +# for example: +flow store set my-key "my value" +# or pipe a value from a command +echo "my value" | flow store set my-key +``` + +**Getting Values** + +```shell +# Direct CLI usage +flow store get KEY + +# for example: +value=$(flow store get my-key) +``` + +**Clearing Values** + +```shell +# Clear all values +flow store clear +# Clear with full flag to remove all stored data +flow store clear --full +``` + +### Using the Store in Executables + +Here's an example of how to use the data store within an executable: + +```yaml +executables: + - verb: run + name: data-store-demo + serial: + params: + execs: + # Set some values in the store + - cmd: | + flow store set user-preference dark-mode + flow store set last-run "$(date)" + # Use those values in a subsequent step + - cmd: | + preference=$(flow store get user-preference) + echo "User preference is: $preference" + echo "Last run: $(flow store get last-run)" +``` + +### Store-Based Conditional Execution + +The data store's contents can be accessed in executable `if` conditions using the `data` context variable. This allows for +dynamic execution paths based on stored values: + +```yaml +executables: + - verb: run + name: conditional-demo + serial: + execs: + - cmd: flow store set feature-enabled true + # This will execute because feature-enabled is set to "true" + - if: data["feature-enabled"] == "true" + cmd: echo "Feature is enabled" + # This will not execute because test-key is not set + - if: len(data["test-key"]) > 0 + cmd: echo "Test key exists" +``` + +See the [Conditional Execution](conditional.md) guide for more examples of using conditions in Flow. + +## Temporary Directories + +Flow provides a special directory reference `f:tmp` that creates an isolated temporary directory for an executable. This +directory is automatically cleaned up when the executable completes. + +### Using Temporary Directories + +To use a temporary directory, set the `dir` field in your executable configuration: + +```yaml +executables: + - verb: build + name: temp-workspace + exec: + dir: f:tmp # Creates and uses a temporary directory + cmd: | + # All commands run in an isolated temp directory + git clone https://github.com/user/repo . + make build +``` + +### Sharing Temporary Files + +While temporary directories are isolated, you can share files between steps in a serial or parallel executable by using +the same temporary directory: + +```yaml +executables: + - verb: process + name: shared-temp + serial: + dir: f:tmp # All sub-executables share this temp directory + execs: + - cmd: echo "Step 1" > output.txt + - cmd: cat output.txt && echo "Step 2" >> output.txt + - cmd: cat output.txt +``` + +## Combining State Management Approaches + +The data store and temporary directories can be used together for more complex state management: + +```yaml +executables: + - verb: build + name: complex-state + serial: + dir: f:tmp + execs: + # Generate a build ID and store it + - cmd: | + build_id=$(date +%Y%m%d_%H%M%S) + flow store set current-build $build_id + # Use the stored build ID for conditional execution + - if: len(data["current-build"]) > 0 + cmd: | + echo "Building artifacts for ${build_id}" + make build + # Clean up based on stored state + - if: data["cleanup-enabled"] == "true" + cmd: make clean +``` + +### Best Practices + +When managing state in Flow, consider the following: + +1. Use the data store for: + - Sharing simple key-value data between executables + - Storing configuration that needs to persist between runs + - Managing feature flags and conditional execution + +2. Use temporary directories for: + - Build and compilation workspaces + - Processing temporary files + - Isolating file operations between different executions diff --git a/docs/schemas/flowfile_schema.json b/docs/schemas/flowfile_schema.json index c7f119d..b1d794c 100644 --- a/docs/schemas/flowfile_schema.json +++ b/docs/schemas/flowfile_schema.json @@ -247,6 +247,11 @@ "type": "string", "default": "" }, + "if": { + "description": "An expression that determines whether the executable should run, using the Expr language syntax. \nThe expression is evaluated at runtime and must resolve to a boolean value. \n\nThe expression has access to OS/architecture information (os, arch), environment variables (env), stored data \n(store), and context information (ctx) like workspace and paths. \n\nFor example, `os == \"darwin\"` will only run on macOS, `len(store[\"feature\"]) \u003e 0` will run if a value exists \nin the store, and `env[\"CI\"] == \"true\"` will run in CI environments. \nSee the [Expr documentation](https://expr-lang.org/docs/language-definition) for more information.\n", + "type": "string", + "default": "" + }, "ref": { "$ref": "#/definitions/ExecutableRef", "description": "A reference to another executable to run in serial.\nOne of `cmd` or `ref` must be set.\n", @@ -475,6 +480,11 @@ "type": "string", "default": "" }, + "if": { + "description": "An expression that determines whether the executable should run, using the Expr language syntax. \nThe expression is evaluated at runtime and must resolve to a boolean value. \n\nThe expression has access to OS/architecture information (os, arch), environment variables (env), stored data \n(store), and context information (ctx) like workspace and paths. \n\nFor example, `os == \"darwin\"` will only run on macOS, `len(store[\"feature\"]) \u003e 0` will run if a value exists \nin the store, and `env[\"CI\"] == \"true\"` will run in CI environments. \nSee the [Expr documentation](https://expr-lang.org/docs/language-definition) for more information.\n", + "type": "string", + "default": "" + }, "ref": { "$ref": "#/definitions/ExecutableRef", "description": "A reference to another executable to run in serial.\nOne of `cmd` or `ref` must be set.\n", diff --git a/docs/types/flowfile.md b/docs/types/flowfile.md index fc64107..9954564 100644 --- a/docs/types/flowfile.md +++ b/docs/types/flowfile.md @@ -206,6 +206,7 @@ Configuration for a parallel executable. | ----- | ----------- | ---- | ------- | :--------: | | `args` | Arguments to pass to the executable. | `array` (`string`) | [] | | | `cmd` | The command to execute. One of `cmd` or `ref` must be set. | `string` | | | +| `if` | An expression that determines whether the executable should run, using the Expr language syntax. The expression is evaluated at runtime and must resolve to a boolean value. The expression has access to OS/architecture information (os, arch), environment variables (env), stored data (store), and context information (ctx) like workspace and paths. For example, `os == "darwin"` will only run on macOS, `len(store["feature"]) > 0` will run if a value exists in the store, and `env["CI"] == "true"` will run in CI environments. See the [Expr documentation](https://expr-lang.org/docs/language-definition) for more information. | `string` | | | | `ref` | A reference to another executable to run in serial. One of `cmd` or `ref` must be set. | [ExecutableRef](#ExecutableRef) | | | | `retries` | The number of times to retry the executable if it fails. | `integer` | 0 | | @@ -352,6 +353,7 @@ Configuration for a serial executable. | ----- | ----------- | ---- | ------- | :--------: | | `args` | Arguments to pass to the executable. | `array` (`string`) | [] | | | `cmd` | The command to execute. One of `cmd` or `ref` must be set. | `string` | | | +| `if` | An expression that determines whether the executable should run, using the Expr language syntax. The expression is evaluated at runtime and must resolve to a boolean value. The expression has access to OS/architecture information (os, arch), environment variables (env), stored data (store), and context information (ctx) like workspace and paths. For example, `os == "darwin"` will only run on macOS, `len(store["feature"]) > 0` will run if a value exists in the store, and `env["CI"] == "true"` will run in CI environments. See the [Expr documentation](https://expr-lang.org/docs/language-definition) for more information. | `string` | | | | `ref` | A reference to another executable to run in serial. One of `cmd` or `ref` must be set. | [ExecutableRef](#ExecutableRef) | | | | `retries` | The number of times to retry the executable if it fails. | `integer` | 0 | | | `reviewRequired` | If set to true, the user will be prompted to review the output of the executable before continuing. | `boolean` | false | | diff --git a/examples/wip.flow b/examples/wip.flow index ad9859f..7e7b07d 100644 --- a/examples/wip.flow +++ b/examples/wip.flow @@ -20,3 +20,26 @@ executables: echo "wip-test1: $($FLOW store get wip-test1)" echo "wip-test2: $($FLOW store get wip-test2)" echo "wip-test3: $($FLOW store get wip-test3)" + - verb: run + name: stateful-conditions + serial: + params: + - envKey: VAL1 + text: value + - envKey: FLOW + text: ../.bin/flow + execs: + - cmd: | + $FLOW store set wip-test1 hello + echo "$VAL1" | $FLOW store set wip-test2 + - cmd: | + $FLOW store get wip-test1 + $FLOW store get wip-test2 + - if: os == "linux" + cmd: echo "Hello from Linux" + - if: os == "darwin" + cmd: echo "Hello from macOS" + - if: len(store.wip-test1) > 0 + cmd: echo "test1 value is set" + - if: len(store.wip-test4) > 0 + cmd: echo "test4 value is set" diff --git a/go.mod b/go.mod index cc8a0df..1064fe4 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/charmbracelet/bubbles v0.20.0 github.com/charmbracelet/bubbletea v1.2.4 github.com/charmbracelet/lipgloss v1.0.0 + github.com/expr-lang/expr v1.16.9 github.com/gen2brain/beeep v0.0.0-20240516210008-9c006672e7f4 github.com/itchyny/gojq v0.12.16 github.com/jahvon/glamour v0.8.1-patch3 diff --git a/go.sum b/go.sum index 00e7ee2..dfd61c1 100644 --- a/go.sum +++ b/go.sum @@ -56,6 +56,8 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f h1:Y/CXytFA4m6baUTXGLOoWe4PQhGxaX0KpnayAqC48p4= github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f/go.mod h1:vw97MGsxSvLiUE2X8qFplwetxpGLQrlU1Q9AUEIzCaM= +github.com/expr-lang/expr v1.16.9 h1:WUAzmR0JNI9JCiF0/ewwHB1gmcGw5wW7nWt8gc6PpCI= +github.com/expr-lang/expr v1.16.9/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/gen2brain/beeep v0.0.0-20240516210008-9c006672e7f4 h1:ygs9POGDQpQGLJPlq4+0LBUmMBNox1N4JSpw+OETcvI= diff --git a/internal/runner/parallel/parallel.go b/internal/runner/parallel/parallel.go index 346a4ad..f9343a4 100644 --- a/internal/runner/parallel/parallel.go +++ b/internal/runner/parallel/parallel.go @@ -1,14 +1,18 @@ package parallel import ( + stdCtx "context" "fmt" "maps" "github.com/pkg/errors" + "golang.org/x/sync/errgroup" "github.com/jahvon/flow/internal/context" "github.com/jahvon/flow/internal/runner" "github.com/jahvon/flow/internal/runner/engine" + "github.com/jahvon/flow/internal/services/expr" + "github.com/jahvon/flow/internal/services/store" argUtils "github.com/jahvon/flow/internal/utils/args" execUtils "github.com/jahvon/flow/internal/utils/executables" "github.com/jahvon/flow/types/executable" @@ -43,7 +47,15 @@ func (r *parallelRunner) Exec( } if len(parallelSpec.Execs) > 0 { - return handleExec(ctx, e, eng, parallelSpec, inputEnv) + str, err := store.NewStore() + if err != nil { + return err + } + defer str.Close() + if err := str.CreateBucket(store.EnvironmentBucket()); err != nil { + return err + } + return handleExec(ctx, e, eng, parallelSpec, inputEnv, str) } return fmt.Errorf("no parallel executables to run") @@ -53,9 +65,33 @@ func handleExec( ctx *context.Context, parent *executable.Executable, eng engine.Engine, parallelSpec *executable.ParallelExecutableType, promptedEnv map[string]string, + str store.Store, ) error { + groupCtx, cancel := stdCtx.WithCancel(ctx.Ctx) + defer cancel() + group, _ := errgroup.WithContext(groupCtx) + limit := parallelSpec.MaxThreads + if limit == 0 { + limit = len(parallelSpec.Execs) + } + group.SetLimit(limit) + + dm, err := str.GetAll() + if err != nil { + return err + } + dataMap := expr.ExpressionEnv(ctx, parent, dm, promptedEnv) + var execs []engine.Exec for i, refConfig := range parallelSpec.Execs { + if refConfig.If != "" { + if truthy, err := expr.IsTruthy(refConfig.If, &dataMap); err != nil { + return err + } else if !truthy { + ctx.Logger.Debugf("skipping execution %d/%d", i+1, len(parallelSpec.Execs)) + continue + } + } var exec *executable.Executable switch { case len(refConfig.Ref) > 0: diff --git a/internal/runner/parallel/parallel_test.go b/internal/runner/parallel/parallel_test.go index fe248b4..c725635 100644 --- a/internal/runner/parallel/parallel_test.go +++ b/internal/runner/parallel/parallel_test.go @@ -130,5 +130,22 @@ var _ = Describe("ParallelRunner", func() { Return(results).Times(1) Expect(parallelRnr.Exec(ctx.Ctx, rootExec, mockEngine, promptedEnv)).ToNot(Succeed()) }) + + It("should skip execution when condition is false", func() { + parallelSpec := rootExec.Parallel + parallelSpec.Execs[0].If = "false" + parallelSpec.Execs[1].If = "true" + mockCache := ctx.ExecutableCache + for i, e := range subExecs { + if i == 1 { + mockCache.EXPECT().GetExecutableByRef(ctx.Logger, e.Ref()).Return(e, nil).Times(1) + } + } + results := engine.ResultSummary{Results: []engine.Result{{}}} + mockEngine.EXPECT(). + Execute(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(results).Times(1) + Expect(parallelRnr.Exec(ctx.Ctx, rootExec, mockEngine, make(map[string]string))).To(Succeed()) + }) }) }) diff --git a/internal/runner/serial/serial.go b/internal/runner/serial/serial.go index e56ddc3..8a45c64 100644 --- a/internal/runner/serial/serial.go +++ b/internal/runner/serial/serial.go @@ -11,6 +11,8 @@ import ( "github.com/jahvon/flow/internal/context" "github.com/jahvon/flow/internal/runner" "github.com/jahvon/flow/internal/runner/engine" + "github.com/jahvon/flow/internal/services/expr" + "github.com/jahvon/flow/internal/services/store" argUtils "github.com/jahvon/flow/internal/utils/args" execUtils "github.com/jahvon/flow/internal/utils/executables" "github.com/jahvon/flow/types/executable" @@ -45,7 +47,15 @@ func (r *serialRunner) Exec( } if len(serialSpec.Execs) > 0 { - return handleExec(ctx, e, eng, serialSpec, inputEnv) + str, err := store.NewStore() + if err != nil { + return err + } + defer str.Close() + if err := str.CreateBucket(store.EnvironmentBucket()); err != nil { + return err + } + return handleExec(ctx, e, eng, serialSpec, inputEnv, str) } return fmt.Errorf("no serial executables to run") } @@ -56,9 +66,27 @@ func handleExec( eng engine.Engine, serialSpec *executable.SerialExecutableType, promptedEnv map[string]string, + str store.Store, ) error { + dm, err := str.GetAll() + if err != nil { + return err + } + dataMap := expr.ExpressionEnv(ctx, parent, dm, promptedEnv) + var execs []engine.Exec for i, refConfig := range serialSpec.Execs { + if refConfig.If != "" { + truthy, err := expr.IsTruthy(refConfig.If, &dataMap) + if err != nil { + return err + } + if !truthy { + ctx.Logger.Debugf("skipping execution %d/%d", i+1, len(serialSpec.Execs)) + continue + } + ctx.Logger.Debugf("condition %s is true", refConfig.If) + } var exec *executable.Executable switch { case refConfig.Ref != "": @@ -87,17 +115,7 @@ func handleExec( exec.Exec.SetLogFields(fields) runExec := func() error { - err := runner.Exec(ctx, exec, eng, execPromptedEnv) - if err != nil { - return err - } - if i < len(serialSpec.Execs) && refConfig.ReviewRequired { - ctx.Logger.Println("Do you want to proceed with the next execution? (y/n)") - if !inputConfirmed(os.Stdin) { - return fmt.Errorf("stopping runner early (%d/%d)", i+1, len(serialSpec.Execs)) - } - } - return nil + return runSerialExecFunc(ctx, i, refConfig, exec, eng, execPromptedEnv, serialSpec) } execs = append(execs, engine.Exec{ID: exec.Ref().String(), Function: runExec, MaxRetries: refConfig.Retries}) @@ -109,6 +127,28 @@ func handleExec( return nil } +func runSerialExecFunc( + ctx *context.Context, + step int, + refConfig executable.SerialRefConfig, + exec *executable.Executable, + eng engine.Engine, + execPromptedEnv map[string]string, + serialSpec *executable.SerialExecutableType, +) error { + err := runner.Exec(ctx, exec, eng, execPromptedEnv) + if err != nil { + return err + } + if step < len(serialSpec.Execs) && refConfig.ReviewRequired { + ctx.Logger.Println("Do you want to proceed with the next execution? (y/n)") + if !inputConfirmed(os.Stdin) { + return fmt.Errorf("stopping runner early (%d/%d)", step+1, len(serialSpec.Execs)) + } + } + return nil +} + func inputConfirmed(in *os.File) bool { var response string _, _ = fmt.Fscanf(in, response) diff --git a/internal/runner/serial/serial_test.go b/internal/runner/serial/serial_test.go index 56ec1ce..bb4d95f 100644 --- a/internal/runner/serial/serial_test.go +++ b/internal/runner/serial/serial_test.go @@ -122,5 +122,21 @@ var _ = Describe("SerialRunner", func() { mockEngine.EXPECT().Execute(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(results).Times(1) Expect(serialRnr.Exec(ctx.Ctx, rootExec, mockEngine, make(map[string]string))).ToNot(Succeed()) }) + + It("should skip execution when condition is false", func() { + serialSpec := rootExec.Serial + serialSpec.Execs[0].If = "false" + serialSpec.Execs[1].If = "true" + mockCache := ctx.ExecutableCache + for i, e := range subExecs { + if i == 1 { + mockCache.EXPECT().GetExecutableByRef(ctx.Logger, e.Ref()).Return(e, nil).Times(1) + } + } + results := engine.ResultSummary{Results: []engine.Result{{}}} + mockEngine.EXPECT().Execute(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(results).Times(1) + Expect(serialRnr.Exec(ctx.Ctx, rootExec, mockEngine, make(map[string]string))).To(Succeed()) + }) }) }) diff --git a/internal/services/expr/expr.go b/internal/services/expr/expr.go new file mode 100644 index 0000000..69aea8b --- /dev/null +++ b/internal/services/expr/expr.go @@ -0,0 +1,97 @@ +package expr + +import ( + "fmt" + "path/filepath" + "runtime" + "strconv" + "strings" + + "github.com/expr-lang/expr" + + "github.com/jahvon/flow/internal/context" + "github.com/jahvon/flow/types/executable" +) + +func IsTruthy(ex string, env *ExpressionData) (bool, error) { + output, err := Evaluate(ex, env) + if err != nil { + return false, err + } + + switch v := output.(type) { + case bool: + return v, nil + case int, int64, float64, uint, uint64: + return v != 0, nil + case string: + truthy, err := strconv.ParseBool(strings.Trim(v, `"' `)) + if err != nil { + return false, err + } + return truthy, nil + default: + return false, nil + } +} + +func Evaluate(ex string, env *ExpressionData) (interface{}, error) { + program, err := expr.Compile(ex, expr.Env(env)) + if err != nil { + return nil, err + } + + output, err := expr.Run(program, env) + if err != nil { + return nil, err + } + return output, nil +} + +func EvaluateString(ex string, env *ExpressionData) (string, error) { + output, err := Evaluate(ex, env) + if err != nil { + return "", err + } + str, ok := output.(string) + if !ok { + return "", fmt.Errorf("expected string, got %T", output) + } + return str, nil +} + +type CtxData struct { + Workspace string `expr:"workspace"` + Namespace string `expr:"namespace"` + WorkspacePath string `expr:"workspacePath"` + FlowFilePath string `expr:"flowFilePath"` + FlowFileDir string `expr:"flowFileDir"` +} + +type ExpressionData struct { + OS string `expr:"os"` + Arch string `expr:"arch"` + Ctx *CtxData `expr:"ctx"` + Store map[string]string `expr:"store"` + Env map[string]string `expr:"env"` +} + +func ExpressionEnv( + ctx *context.Context, + executable *executable.Executable, + dataMap, envMap map[string]string, +) ExpressionData { + return ExpressionData{ + OS: runtime.GOOS, + Arch: runtime.GOARCH, + Ctx: &CtxData{ + Workspace: ctx.CurrentWorkspace.AssignedName(), + Namespace: ctx.CurrentWorkspace.AssignedName(), + WorkspacePath: executable.WorkspacePath(), + FlowFilePath: executable.FlowFilePath(), + FlowFileDir: filepath.Dir(executable.FlowFilePath()), + }, + Store: dataMap, + Env: envMap, + } +} diff --git a/internal/services/expr/expr_test.go b/internal/services/expr/expr_test.go new file mode 100644 index 0000000..74773d5 --- /dev/null +++ b/internal/services/expr/expr_test.go @@ -0,0 +1,135 @@ +package expr_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/jahvon/flow/internal/services/expr" +) + +func TestExpr(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Expr Suite") +} + +var _ = Describe("Expr", func() { + Describe("IsTruthy", func() { + It("should evaluate truthy expressions correctly", func() { + tests := []struct { + expr string + env *expr.ExpressionData + expected bool + }{ + {"true", nil, true}, + {"false", nil, false}, + {"1", nil, true}, + {"0", nil, false}, + {`"true"`, nil, true}, + {`"false"`, nil, false}, + } + + for _, test := range tests { + result, err := expr.IsTruthy(test.expr, test.env) + Expect(err).NotTo(HaveOccurred()) + By("testing expression: " + test.expr) + Expect(result).To(Equal(test.expected)) + } + }) + }) + + Describe("Evaluate", func() { + It("should evaluate expressions correctly", func() { + tests := []struct { + expr string + env *expr.ExpressionData + expected interface{} + }{ + {"1 + 1", nil, 2}, + {"true && false", nil, false}, + {`"hello" + " " + "world"`, nil, "hello world"}, + } + + for _, test := range tests { + result, err := expr.Evaluate(test.expr, test.env) + Expect(err).NotTo(HaveOccurred()) + By("testing expression: " + test.expr) + Expect(result).To(Equal(test.expected)) + } + }) + }) + + Describe("EvaluateString", func() { + It("should evaluate string expressions correctly", func() { + tests := []struct { + expr string + env *expr.ExpressionData + expected string + }{ + {`"hello"`, nil, "hello"}, + {`"foo" + "bar"`, nil, "foobar"}, + } + + for _, test := range tests { + result, err := expr.EvaluateString(test.expr, test.env) + Expect(err).NotTo(HaveOccurred()) + By("testing expression: " + test.expr) + Expect(result).To(Equal(test.expected)) + } + }) + }) + + var _ = Describe("ExpressionData", func() { + var ( + data *expr.ExpressionData + ) + + BeforeEach(func() { + data = &expr.ExpressionData{ + OS: "linux", + Arch: "amd64", + Ctx: &expr.CtxData{ + Workspace: "test_workspace", + Namespace: "test_namespace", + WorkspacePath: "/path/to/workspace", + FlowFilePath: "/path/to/flowfile", + FlowFileDir: "/path/to", + }, + Store: map[string]string{ + "key1": "value1", + "key2": "value2", + }, + Env: map[string]string{ + "ENV_VAR1": "env_value1", + "ENV_VAR2": "env_value2", + }, + } + }) + + Describe("Evaluate complex expressions", func() { + It("should evaluate various expressions correctly", func() { + tests := []struct { + expr string + expected interface{} + }{ + {"1 + 1", 2}, + {"true && false", false}, + {`"hello" + " " + "world"`, "hello world"}, + {`store["key1"]`, "value1"}, + {`env["ENV_VAR1"]`, "env_value1"}, + {`os == "linux"`, true}, + {`arch == "amd64"`, true}, + {`ctx.workspace == "test_workspace"`, true}, + } + + for _, test := range tests { + result, err := expr.Evaluate(test.expr, data) + Expect(err).NotTo(HaveOccurred()) + By("testing expression: " + test.expr) + Expect(result).To(Equal(test.expected)) + } + }) + }) + }) +}) diff --git a/tests/store_test.go b/tests/store_test.go index 539d536..9f86882 100644 --- a/tests/store_test.go +++ b/tests/store_test.go @@ -54,7 +54,7 @@ var _ = Describe("store e2e", Ordered, func() { Expect(run.Run(ctx, "store", "clear")).To(Succeed()) out, err := readFileContent(stdOut) Expect(err).NotTo(HaveOccurred()) - Expect(out).To(ContainSubstring("Data store cleared")) + Expect(out).To(ContainSubstring("Store store cleared")) }) }) }) diff --git a/types/executable/executable.gen.go b/types/executable/executable.gen.go index a5d6816..0c269fa 100644 --- a/types/executable/executable.gen.go +++ b/types/executable/executable.gen.go @@ -219,6 +219,22 @@ type ParallelRefConfig struct { // Cmd string `json:"cmd,omitempty" yaml:"cmd,omitempty" mapstructure:"cmd,omitempty"` + // An expression that determines whether the executable should run, using the Expr + // language syntax. + // The expression is evaluated at runtime and must resolve to a boolean value. + // + // The expression has access to OS/architecture information (os, arch), + // environment variables (env), stored data + // (store), and context information (ctx) like workspace and paths. + // + // For example, `os == "darwin"` will only run on macOS, `len(store["feature"]) > + // 0` will run if a value exists + // in the store, and `env["CI"] == "true"` will run in CI environments. + // See the [Expr documentation](https://expr-lang.org/docs/language-definition) + // for more information. + // + If string `json:"if,omitempty" yaml:"if,omitempty" mapstructure:"if,omitempty"` + // A reference to another executable to run in serial. // One of `cmd` or `ref` must be set. // @@ -383,6 +399,22 @@ type SerialRefConfig struct { // Cmd string `json:"cmd,omitempty" yaml:"cmd,omitempty" mapstructure:"cmd,omitempty"` + // An expression that determines whether the executable should run, using the Expr + // language syntax. + // The expression is evaluated at runtime and must resolve to a boolean value. + // + // The expression has access to OS/architecture information (os, arch), + // environment variables (env), stored data + // (store), and context information (ctx) like workspace and paths. + // + // For example, `os == "darwin"` will only run on macOS, `len(store["feature"]) > + // 0` will run if a value exists + // in the store, and `env["CI"] == "true"` will run in CI environments. + // See the [Expr documentation](https://expr-lang.org/docs/language-definition) + // for more information. + // + If string `json:"if,omitempty" yaml:"if,omitempty" mapstructure:"if,omitempty"` + // A reference to another executable to run in serial. // One of `cmd` or `ref` must be set. // diff --git a/types/executable/executable_md.go b/types/executable/executable_md.go index 251caed..c117f6e 100644 --- a/types/executable/executable_md.go +++ b/types/executable/executable_md.go @@ -179,7 +179,7 @@ func renderExecMarkdown(e *ExecutableEnvironment, r *RenderExecutableType) strin mkdwn += fmt.Sprintf("**Template File:** `%s`\n", r.TemplateFile) } if r.TemplateDataFile != "" { - mkdwn += fmt.Sprintf("**Template Data File:** `%s`\n", r.TemplateDataFile) + mkdwn += fmt.Sprintf("**Template Store File:** `%s`\n", r.TemplateDataFile) } mkdwn += execEnvTable(e) return mkdwn diff --git a/types/executable/executable_schema.yaml b/types/executable/executable_schema.yaml index 4f70b35..6dbbf01 100644 --- a/types/executable/executable_schema.yaml +++ b/types/executable/executable_schema.yaml @@ -343,6 +343,19 @@ definitions: A reference to another executable to run in serial. One of `cmd` or `ref` must be set. default: "" + if: + type: string + description: | + An expression that determines whether the executable should run, using the Expr language syntax. + The expression is evaluated at runtime and must resolve to a boolean value. + + The expression has access to OS/architecture information (os, arch), environment variables (env), stored data + (store), and context information (ctx) like workspace and paths. + + For example, `os == "darwin"` will only run on macOS, `len(store["feature"]) > 0` will run if a value exists + in the store, and `env["CI"] == "true"` will run in CI environments. + See the [Expr documentation](https://expr-lang.org/docs/language-definition) for more information. + default: "" args: type: array items: @@ -495,6 +508,19 @@ definitions: A reference to another executable to run in serial. One of `cmd` or `ref` must be set. default: "" + if: + type: string + description: | + An expression that determines whether the executable should run, using the Expr language syntax. + The expression is evaluated at runtime and must resolve to a boolean value. + + The expression has access to OS/architecture information (os, arch), environment variables (env), stored data + (store), and context information (ctx) like workspace and paths. + + For example, `os == "darwin"` will only run on macOS, `len(store["feature"]) > 0` will run if a value exists + in the store, and `env["CI"] == "true"` will run in CI environments. + See the [Expr documentation](https://expr-lang.org/docs/language-definition) for more information. + default: "" args: type: array items: