Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into make-client-dynamic
Browse files Browse the repository at this point in the history
  • Loading branch information
antlai-temporal committed Jan 25, 2024
2 parents 3d770d2 + cb08b1b commit 5128702
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 24 deletions.
91 changes: 90 additions & 1 deletion core/src/core_tests/queries.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use crate::{
test_help::{
build_mock_pollers, canned_histories, hist_to_poll_resp, mock_worker, single_hist_mock_sg,
MockPollCfg, ResponseType, WorkerExt,
MockPollCfg, MocksHolder, ResponseType, WorkerExt,
},
worker::{client::mocks::mock_workflow_client, LEGACY_QUERY_ID},
};
use futures_util::stream;
use std::{
collections::{HashMap, VecDeque},
time::Duration,
Expand Down Expand Up @@ -891,3 +892,91 @@ async fn build_id_set_properly_on_query_on_first_task() {
.unwrap();
core.drain_pollers_and_shutdown().await;
}

#[tokio::test]
async fn queries_arent_lost_in_buffer_void() {
let wfid = "fake_wf_id";
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
t.add_we_signaled("sig", vec![]);
t.add_full_wf_task();
t.add_workflow_execution_completed();
let tasks = [
hist_to_poll_resp(&t, wfid.to_owned(), 1.into()),
{
let mut pr = hist_to_poll_resp(&t, wfid.to_owned(), 1.into());
pr.query = Some(WorkflowQuery {
query_type: "1".to_string(),
..Default::default()
});
pr.started_event_id = 0;
pr
},
{
let mut pr = hist_to_poll_resp(&t, wfid.to_owned(), 1.into());
pr.query = Some(WorkflowQuery {
query_type: "2".to_string(),
..Default::default()
});
pr.started_event_id = 0;
pr
},
hist_to_poll_resp(&t, wfid.to_owned(), 2.into()),
]
.map(|r| r.resp);

let mut mock = mock_workflow_client();
mock.expect_complete_workflow_task()
.returning(|_| Ok(Default::default()));
mock.expect_respond_legacy_query()
.times(2)
.returning(|_, _| Ok(Default::default()));
let mut mock = MocksHolder::from_wft_stream(mock, stream::iter(tasks));
mock.worker_cfg(|wc| wc.max_cached_workflows = 1);
let core = mock_worker(mock);

let task = core.poll_workflow_activation().await.unwrap();
core.complete_workflow_activation(WorkflowActivationCompletion::empty(task.run_id))
.await
.unwrap();

let task = core.poll_workflow_activation().await.unwrap();
assert_matches!(
task.jobs.as_slice(),
[WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::QueryWorkflow(q)),
}] => q.query_type == "1"
);
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
query_ok(LEGACY_QUERY_ID.to_string(), "hi"),
))
.await
.unwrap();

let task = core.poll_workflow_activation().await.unwrap();
assert_matches!(
task.jobs.as_slice(),
[WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::QueryWorkflow(q)),
}] => q.query_type == "2"
);
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
query_ok(LEGACY_QUERY_ID.to_string(), "hi"),
))
.await
.unwrap();

let task = core.poll_workflow_activation().await.unwrap();
assert_matches!(
task.jobs.as_slice(),
[WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::SignalWorkflow(_)),
}]
);
core.complete_execution(&task.run_id).await;

core.shutdown().await;
}
9 changes: 9 additions & 0 deletions core/src/test_help/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use mockall::TimesRange;
use parking_lot::RwLock;
use std::{
collections::{BTreeMap, HashMap, HashSet, VecDeque},
fmt::Debug,
ops::{Deref, DerefMut},
pin::Pin,
sync::{
Expand Down Expand Up @@ -722,6 +723,14 @@ impl<T> DerefMut for QueueResponse<T> {
&mut self.resp
}
}
impl<T> Debug for QueueResponse<T>
where
T: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.resp.fmt(f)
}
}

pub fn hist_to_poll_resp(
t: &TestHistoryBuilder,
Expand Down
39 changes: 30 additions & 9 deletions core/src/worker/workflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use std::{
collections::VecDeque,
fmt::Debug,
future::Future,
mem,
mem::discriminant,
ops::DerefMut,
rc::Rc,
Expand Down Expand Up @@ -1023,17 +1024,31 @@ struct BufferedTasks {
/// supersede any old one.
wft: Option<PermittedWFT>,
/// For query only tasks, multiple may be received concurrently and it's OK to buffer more
/// than one - however they should be dropped if, by the time we try to process them, we
/// have already processed a newer real WFT than the one the query was targeting (otherwise
/// we'd return data from the "future").
/// than one - however they must all be handled before applying the next "real" wft (after the
/// current one has been processed).
query_only_tasks: VecDeque<PermittedWFT>,
/// These are query-only tasks for the *buffered* wft, if any. They will all be discarded if
/// a buffered wft is replaced before being handled. They move to `query_only_tasks` once the
/// buffered task is taken.
query_only_tasks_for_buffered: VecDeque<PermittedWFT>,
}

impl BufferedTasks {
/// Buffers a new task. If it is a query-only task, multiple such tasks may be buffered which
/// all will be handled at the end of the current WFT. If a new WFT which would advance history
/// is provided, it will be buffered - but if another such task comes in while there is already
/// one buffered, the old one will be overriden, and all queries will be invalidated.
fn buffer(&mut self, task: PermittedWFT) {
if task.work.is_query_only() {
self.query_only_tasks.push_back(task);
if self.wft.is_none() {
self.query_only_tasks.push_back(task);
} else {
self.query_only_tasks_for_buffered.push_back(task);
}
} else {
if self.wft.is_some() {
self.query_only_tasks_for_buffered.clear();
}
let _ = self.wft.insert(task);
}
}
Expand All @@ -1042,12 +1057,18 @@ impl BufferedTasks {
self.wft.is_some() || !self.query_only_tasks.is_empty()
}

/// Remove and return the next WFT from the buffer that should be applied. WFTs which would
/// advance workflow state are returned before query-only tasks.
/// Remove and return the next WFT from the buffer that should be applied. Queries are returned
/// first for the current workflow task, if there are any. If not, the next WFT that would
/// advance history is returned.
fn get_next_wft(&mut self) -> Option<PermittedWFT> {
self.wft
.take()
.or_else(|| self.query_only_tasks.pop_front())
if let Some(q) = self.query_only_tasks.pop_front() {
return Some(q);
}
if let Some(t) = self.wft.take() {
self.query_only_tasks = mem::take(&mut self.query_only_tasks_for_buffered);
return Some(t);
}
None
}
}

Expand Down
4 changes: 2 additions & 2 deletions sdk-core-protos/src/history_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::temporal::api::{
workflowservice::v1::{GetWorkflowExecutionHistoryResponse, PollWorkflowTaskQueueResponse},
};
use anyhow::{anyhow, bail};
use rand::{thread_rng, Rng};
use rand::random;

/// Contains information about a validated history. Used for replay and other testing.
#[derive(Clone, Debug, PartialEq)]
Expand Down Expand Up @@ -154,7 +154,7 @@ impl HistoryInfo {
/// randomly generated task token. Caller should attach a meaningful `workflow_execution` if
/// needed.
pub fn as_poll_wft_response(&self) -> PollWorkflowTaskQueueResponse {
let task_token: [u8; 16] = thread_rng().gen();
let task_token: [u8; 16] = random();
PollWorkflowTaskQueueResponse {
history: Some(History {
events: self.events.clone(),
Expand Down
24 changes: 12 additions & 12 deletions tests/integ_tests/queries_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,8 @@ async fn multiple_concurrent_queries_no_new_history() {
}

#[tokio::test]
async fn query_superseded_by_newer_wft_is_discarded() {
let mut starter = init_core_and_create_wf("query_superseded_by_newer_wft_is_discarded").await;
async fn queries_handled_before_next_wft() {
let mut starter = init_core_and_create_wf("queries_handled_before_next_wft").await;
let core = starter.get_worker().await;
let workflow_id = starter.get_task_queue().to_string();
let task = core.poll_workflow_activation().await.unwrap();
Expand Down Expand Up @@ -447,16 +447,7 @@ async fn query_superseded_by_newer_wft_is_discarded() {
))
.await
.unwrap();
// We should get the signal activation since the in-buffer query should've been failed
let task = core.poll_workflow_activation().await.unwrap();
assert_matches!(
task.jobs.as_slice(),
[WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::SignalWorkflow(_)),
}]
);
core.complete_execution(&task.run_id).await;
// Query will get retried by server since we fail the task w/ the stale query
// We now get the second query
let task = core.poll_workflow_activation().await.unwrap();
let query = assert_matches!(
task.jobs.as_slice(),
Expand All @@ -479,6 +470,15 @@ async fn query_superseded_by_newer_wft_is_discarded() {
))
.await
.unwrap();
// Then the signal afterward
let task = core.poll_workflow_activation().await.unwrap();
assert_matches!(
task.jobs.as_slice(),
[WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::SignalWorkflow(_)),
}]
);
core.complete_execution(&task.run_id).await;
};
join!(join_all(query_futs), complete_fut);
drain_pollers_and_shutdown(&core).await;
Expand Down

0 comments on commit 5128702

Please sign in to comment.