Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure current task build id works with queries / caching #665

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,7 @@ impl WorkflowClientTrait for Client {
.and_then(|d| d.try_into().ok()),
workflow_run_timeout: options.run_timeout.and_then(|d| d.try_into().ok()),
workflow_task_timeout: options.task_timeout.and_then(|d| d.try_into().ok()),
search_attributes: options.search_attributes.and_then(|d| d.try_into().ok()),
search_attributes: options.search_attributes.map(|d| d.into()),
cron_schedule: options.cron_schedule.unwrap_or_default(),
request_eager_execution: options.enable_eager_workflow_start,
..Default::default()
Expand Down Expand Up @@ -1186,9 +1186,7 @@ impl WorkflowClientTrait for Client {
workflow_task_timeout: workflow_options
.task_timeout
.and_then(|d| d.try_into().ok()),
search_attributes: workflow_options
.search_attributes
.and_then(|d| d.try_into().ok()),
search_attributes: workflow_options.search_attributes.map(|d| d.into()),
cron_schedule: workflow_options.cron_schedule.unwrap_or_default(),
header: options.signal_header,
..Default::default()
Expand Down
6 changes: 4 additions & 2 deletions core/src/core_tests/activity_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -906,8 +906,10 @@ async fn activity_tasks_from_completion_reserve_slots() {
mh.completion_asserts = Some(Box::new(|wftc| {
// Make sure when we see the completion with the schedule act command that it does
// not have the eager execution flag set the first time, and does the second.
if let Some(Attributes::ScheduleActivityTaskCommandAttributes(attrs)) =
wftc.commands.get(0).and_then(|cmd| cmd.attributes.as_ref())
if let Some(Attributes::ScheduleActivityTaskCommandAttributes(attrs)) = wftc
.commands
.first()
.and_then(|cmd| cmd.attributes.as_ref())
{
if attrs.activity_id == "1" {
assert!(!attrs.request_eager_execution);
Expand Down
2 changes: 1 addition & 1 deletion core/src/protosext/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl Debug for ValidPollWFTQResponse {
self.previous_started_event_id,
self.started_event_id,
self.history.events.len(),
self.history.events.get(0).map(|e| e.event_id),
self.history.events.first().map(|e| e.event_id),
self.legacy_query,
self.query_requests
)
Expand Down
38 changes: 19 additions & 19 deletions core/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,25 +279,6 @@ fn metric_temporality_to_selector(
MetricTemporality::Delta => ConstantTemporality(Temporality::Delta),
}
}

#[cfg(test)]
pub mod test_initters {
use super::*;
use temporal_sdk_core_api::telemetry::TelemetryOptionsBuilder;

#[allow(dead_code)] // Not always used, called to enable for debugging when needed
pub fn test_telem_console() {
telemetry_init_global(
TelemetryOptionsBuilder::default()
.logging(Logger::Console {
filter: construct_filter_string(Level::DEBUG, Level::WARN),
})
.build()
.unwrap(),
)
.unwrap();
}
}
#[cfg(test)]
pub use test_initters::*;

Expand Down Expand Up @@ -326,3 +307,22 @@ where
format!("[{}]", self.iter().format(","))
}
}

#[cfg(test)]
pub mod test_initters {
use super::*;
use temporal_sdk_core_api::telemetry::TelemetryOptionsBuilder;

#[allow(dead_code)] // Not always used, called to enable for debugging when needed
pub fn test_telem_console() {
telemetry_init_global(
TelemetryOptionsBuilder::default()
.logging(Logger::Console {
filter: construct_filter_string(Level::DEBUG, Level::WARN),
})
.build()
.unwrap(),
)
.unwrap();
}
}
2 changes: 1 addition & 1 deletion core/src/test_help/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ pub(crate) fn build_mock_pollers(mut cfg: MockPollCfg) -> MocksHolder {
for (_, tasks) in &mut resp_iter {
// Must extract run id from a workflow task associated with this workflow
// TODO: Case where run id changes for same workflow id is not handled here
if let Some(t) = tasks.get(0) {
if let Some(t) = tasks.front() {
let rid = t.workflow_execution.as_ref().unwrap().run_id.clone();
if !outstanding.has_run(&rid) {
let t = tasks.pop_front().unwrap();
Expand Down
24 changes: 5 additions & 19 deletions core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,7 @@ pub struct Worker {
#[async_trait::async_trait]
impl WorkerTrait for Worker {
async fn poll_workflow_activation(&self) -> Result<WorkflowActivation, PollWfError> {
self.next_workflow_activation().await.map(|mut a| {
// Attach this worker's Build ID to the activation if appropriate. This is done here
// to avoid cloning the ID for every workflow instance. Can be lowered when
// https://github.com/temporalio/sdk-core/issues/567 is done
if !a.is_replaying {
a.build_id_for_current_task = self.config.worker_build_id.clone();
}
a
})
self.next_workflow_activation().await
}

#[instrument(skip(self))]
Expand Down Expand Up @@ -227,7 +219,7 @@ impl Worker {

#[allow(clippy::too_many_arguments)] // Not much worth combining here
pub(crate) fn new_with_pollers(
mut config: WorkerConfig,
config: WorkerConfig,
sticky_queue_name: Option<String>,
client: Arc<dyn WorkerClient>,
task_pollers: TaskPollers,
Expand Down Expand Up @@ -384,7 +376,7 @@ impl Worker {
wf_client: client.clone(),
workflows: Workflows::new(
build_wf_basics(
&mut config,
config.clone(),
metrics,
shutdown_token.child_token(),
client.capabilities().cloned().unwrap_or_default(),
Expand Down Expand Up @@ -669,22 +661,16 @@ pub struct PostActivateHookData<'a> {
}

fn build_wf_basics(
config: &mut WorkerConfig,
config: WorkerConfig,
metrics: MetricsContext,
shutdown_token: CancellationToken,
server_capabilities: get_system_info_response::Capabilities,
) -> WorkflowBasics {
WorkflowBasics {
max_cached_workflows: config.max_cached_workflows,
worker_config: Arc::new(config),
shutdown_token,
metrics,
namespace: config.namespace.clone(),
task_queue: config.task_queue.clone(),
ignore_evicts_on_shutdown: config.ignore_evicts_on_shutdown,
fetching_concurrency: config.fetching_concurrency,
server_capabilities,
#[cfg(feature = "save_wf_inputs")]
wf_state_inputs: config.wf_state_inputs.take(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/worker/workflow/history_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ impl HistoryUpdate {
self.previous_wft_started_id >= 0
}
pub fn first_event_id(&self) -> Option<i64> {
self.events.get(0).map(|e| e.event_id)
self.events.first().map(|e| e.event_id)
}

#[cfg(debug_assertions)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ impl Cancellable for SignalExternalMachine {
fn cancel(&mut self) -> Result<Vec<MachineResponse>, MachineError<Self::Error>> {
let res = OnEventWrapper::on_event_mut(self, SignalExternalMachineEvents::Cancel)?;
let mut ret = vec![];
match res.get(0) {
match res.first() {
Some(SignalExternalCommand::Cancelled) => {
ret = vec![ResolveSignalExternalWorkflow {
seq: self.shared_state.seq,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ mod tests {
// Ensure the upsert command has an empty map when not using the patched command
if !with_patched_cmd {
mp.completion_asserts = Some(Box::new(|wftc| {
assert_matches!(wftc.commands.get(0).and_then(|c| c.attributes.as_ref()).unwrap(),
assert_matches!(wftc.commands.first().and_then(|c| c.attributes.as_ref()).unwrap(),
Attributes::UpsertWorkflowSearchAttributesCommandAttributes(attrs)
if attrs.search_attributes.as_ref().unwrap().indexed_fields.is_empty())
}));
Expand Down
34 changes: 22 additions & 12 deletions core/src/worker/workflow/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ use std::{
hash::{Hash, Hasher},
iter::Peekable,
rc::Rc,
sync::Arc,
time::{Duration, Instant, SystemTime},
};
use temporal_sdk_core_api::worker::WorkerConfig;
use temporal_sdk_core_protos::{
coresdk::{
common::{NamespacedWorkflowExecution, VersioningIntent},
Expand Down Expand Up @@ -93,16 +95,12 @@ pub(crate) struct WorkflowMachines {
pub last_processed_event: i64,
/// True if the workflow is replaying from history
pub replaying: bool,
/// Namespace this workflow exists in
namespace: String,
/// Workflow identifier
pub workflow_id: String,
/// Workflow type identifier. (Function name, class, etc)
pub workflow_type: String,
/// Identifies the current run
pub run_id: String,
/// The task queue this workflow is operating within
task_queue: String,
/// Is set to true once we've seen the final event in workflow history, to avoid accidentally
/// re-applying the final workflow task.
pub have_seen_terminal_event: bool,
Expand Down Expand Up @@ -160,6 +158,7 @@ pub(crate) struct WorkflowMachines {

/// Metrics context
pub metrics: MetricsContext,
worker_config: Arc<WorkerConfig>,
}

#[derive(Debug, derive_more::Display)]
Expand Down Expand Up @@ -246,11 +245,9 @@ impl WorkflowMachines {
Self {
last_history_from_server: basics.history,
protocol_msgs: vec![],
namespace: basics.namespace,
workflow_id: basics.workflow_id,
workflow_type: basics.workflow_type,
run_id: basics.run_id,
task_queue: basics.task_queue,
drive_me: driven_wf,
replaying,
metrics: basics.metrics,
Expand All @@ -277,6 +274,7 @@ impl WorkflowMachines {
encountered_patch_markers: Default::default(),
local_activity_data: LocalActivityData::default(),
have_seen_terminal_event: false,
worker_config: basics.worker_config,
}
}

Expand Down Expand Up @@ -420,9 +418,15 @@ impl WorkflowMachines {
Some(workflow_activation_job::Variant::QueryWorkflow(_))
)
});
let is_replaying = self.replaying || all_query;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't queries considered replaying anyway?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not until this point, that's where it happens

let build_id_for_current_task = if is_replaying {
self.current_wft_build_id.clone().unwrap_or_default()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does core need the same logic as Go to check for an empty build ID?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lookahead works a bit differently and avoids this problem - I actually ended up changing the Go one to be a bit simpler and more similar in my last fix too.

} else {
self.worker_config.worker_build_id.clone()
};
WorkflowActivation {
timestamp: self.current_wf_time.map(Into::into),
is_replaying: self.replaying || all_query,
is_replaying,
run_id: self.run_id.clone(),
history_length: self.last_processed_event as u32,
jobs,
Expand All @@ -432,7 +436,7 @@ impl WorkflowMachines {
.collect(),
history_size_bytes: self.history_size_bytes,
continue_as_new_suggested: self.continue_as_new_suggested,
build_id_for_current_task: self.current_wft_build_id.clone().unwrap_or_default(),
build_id_for_current_task,
}
}

Expand All @@ -447,7 +451,13 @@ impl WorkflowMachines {
.any(|v| v.is_la_resolution)
}

pub(crate) fn get_metadata_for_wft_complete(&self) -> WorkflowTaskCompletedMetadata {
pub(crate) fn get_metadata_for_wft_complete(&mut self) -> WorkflowTaskCompletedMetadata {
// If this worker has a build ID and we're completing the task, we want to say our ID is the
// current build ID, so that if we get a query before any new history, we properly can
// report that our ID was the one used for the completion.
if !self.worker_config.worker_build_id.is_empty() {
self.current_wft_build_id = Some(self.worker_config.worker_build_id.clone());
}
(*self.observed_internal_flags)
.borrow_mut()
.gather_for_wft_complete()
Expand Down Expand Up @@ -1371,7 +1381,7 @@ impl WorkflowMachines {
}
Some(cancel_we::Target::ChildWorkflowId(wfid)) => (
NamespacedWorkflowExecution {
namespace: self.namespace.clone(),
namespace: self.worker_config.namespace.clone(),
workflow_id: wfid,
run_id: "".to_string(),
},
Expand All @@ -1392,7 +1402,7 @@ impl WorkflowMachines {
WFCommand::SignalExternalWorkflow(attrs) => {
let seq = attrs.seq;
self.add_cmd_to_wf_task(
new_external_signal(attrs, &self.namespace)?,
new_external_signal(attrs, &self.worker_config.namespace)?,
CommandID::SignalExternal(seq).into(),
);
}
Expand Down Expand Up @@ -1539,7 +1549,7 @@ impl WorkflowMachines {
VersioningIntent::Unspecified => {
// If the target TQ is empty, that means use same TQ.
// When TQs match, use compat by default
target_tq.is_empty() || target_tq == self.task_queue
target_tq.is_empty() || target_tq == self.worker_config.task_queue
}
}
}
Expand Down
20 changes: 8 additions & 12 deletions core/src/worker/workflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ use std::{
thread,
time::{Duration, Instant},
};
use temporal_sdk_core_api::errors::{CompleteWfError, PollWfError};
use temporal_sdk_core_api::{
errors::{CompleteWfError, PollWfError},
worker::WorkerConfig,
};
use temporal_sdk_core_protos::{
coresdk::{
workflow_activation::{
Expand Down Expand Up @@ -127,24 +130,17 @@ pub(crate) struct Workflows {
}

pub(crate) struct WorkflowBasics {
pub max_cached_workflows: usize,
pub worker_config: Arc<WorkerConfig>,
pub shutdown_token: CancellationToken,
pub metrics: MetricsContext,
pub namespace: String,
pub task_queue: String,
pub ignore_evicts_on_shutdown: bool,
pub fetching_concurrency: usize,
pub server_capabilities: get_system_info_response::Capabilities,
#[cfg(feature = "save_wf_inputs")]
pub wf_state_inputs: Option<UnboundedSender<Vec<u8>>>,
}

pub(crate) struct RunBasics<'a> {
pub namespace: String,
pub worker_config: Arc<WorkerConfig>,
pub workflow_id: String,
pub workflow_type: String,
pub run_id: String,
pub task_queue: String,
pub history: HistoryUpdate,
pub metrics: MetricsContext,
pub capabilities: &'a get_system_info_response::Capabilities,
Expand All @@ -167,10 +163,10 @@ impl Workflows {
let (local_tx, local_rx) = unbounded_channel();
let (fetch_tx, fetch_rx) = unbounded_channel();
let shutdown_tok = basics.shutdown_token.clone();
let task_queue = basics.task_queue.clone();
let task_queue = basics.worker_config.task_queue.clone();
let extracted_wft_stream = WFTExtractor::build(
client.clone(),
basics.fetching_concurrency,
basics.worker_config.fetching_concurrency,
wft_stream,
UnboundedReceiverStream::new(fetch_rx),
);
Expand Down
Loading
Loading