From 670e56fb1b439ff902dac4079c901ff145c207fb Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Wed, 1 May 2024 11:52:08 -0700 Subject: [PATCH] FnAPI proto changes for ordered list state. (#31092) --- .../model/fn_execution/v1/beam_fn_api.proto | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto b/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto index 4b40c7fa4e4e..df61d6041a04 100644 --- a/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto +++ b/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto @@ -1021,6 +1021,29 @@ message StateKey { bytes map_key = 5; } + // Represents a request for an ordered list of values associated with a + // specified user key and window for a PTransform. See + // https://s.apache.org/beam-fn-state-api-and-bundle-processing for further + // details. + // + // The response data stream will be a concatenation of all entries of sort key + // and V's associated with the specified user key and window. + // See https://s.apache.org/beam-fn-api-send-and-receive-data for further + // details. + message OrderedListUserState { + // (Required) The id of the PTransform containing user state. + string transform_id = 1; + // (Required) The id of the user state. + string user_state_id = 2; + // (Required) The window encoded in a nested context. + bytes window = 3; + // (Required) The key of the currently executing element encoded in a + // nested context. + bytes key = 4; + // (Required) The sort range encoded in a nested context. + OrderedListRange range = 5; + } + // (Required) One of the following state keys must be set. oneof type { Runner runner = 1; @@ -1031,6 +1054,7 @@ message StateKey { MultimapKeysValuesSideInput multimap_keys_values_side_input = 8; MultimapKeysUserState multimap_keys_user_state = 6; MultimapUserState multimap_user_state = 7; + OrderedListUserState ordered_list_user_state = 9; } } @@ -1055,6 +1079,8 @@ message StateGetResponse { // Represents a part of a logical byte stream. Elements within // the logical byte stream are encoded in the nested context and // concatenated together. + // + // See also the note about OrderedListState in StateAppendRequest. bytes data = 2; } @@ -1063,6 +1089,11 @@ message StateAppendRequest { // Represents a part of a logical byte stream. Elements within // the logical byte stream are encoded in the nested context and // multiple append requests are concatenated together. + // + // For OrderedListState, elements of should be encoded with the + // beam:coder:kv:v1 coder, where the first (key) component must be a + // beam:coder:varint:v1 and the second (value) component must be encoded + // with a beam:coder:length_prefix:v1 coder. bytes data = 1; } @@ -1075,6 +1106,12 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). +message OrderedListRange { + int64 start = 1; + int64 end = 2; +} + /* * Logging API *