Skip to content

Commit

Permalink
Remove schedule-to-close timeout. Log on timeout firing.
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Nov 2, 2023
1 parent eb0726f commit db6bff4
Showing 1 changed file with 17 additions and 49 deletions.
66 changes: 17 additions & 49 deletions core/src/worker/activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use std::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::{Duration, Instant, SystemTime},
time::{Duration, Instant},
};
use temporal_sdk_core_protos::{
coresdk::{
Expand Down Expand Up @@ -542,44 +542,25 @@ where
} else {
// Fire off task to keep track of local timeouts. We do this so that
// activities can still get cleaned up even if the user isn't
// heartbeating.
// heartbeating. Schedule to closed is not tracked due to the
// possibility of clock skew messing things up, and it's relative
// unlikeliness compared to the other timeouts.
let local_timeout_buffer = self.local_timeout_buffer;
let sched = task
.resp
.schedule_to_close_timeout
.clone()
.and_then(|to| {
// Must filter out zero before any subtraction math here
if to.seconds == 0 && to.nanos == 0 {
None
} else {
Some(to)
}
})
.zip(task.resp.scheduled_time.clone())
.and_then(|(to, st)| {
let now = SystemTime::now();
let sched_time = SystemTime::try_from(st).unwrap_or(now);
let already_elapsed =
now.duration_since(sched_time).unwrap_or_default();
Duration::try_from(to).map(|to| to - already_elapsed).ok()
});
static HEARTBEAT_TYPE: &str = "heartbeat";
let timeout_at = [
task.resp.heartbeat_timeout.clone(),
task.resp.start_to_close_timeout.clone(),
(HEARTBEAT_TYPE, task.resp.heartbeat_timeout.clone()),
("start_to_close", task.resp.start_to_close_timeout.clone()),
]
.into_iter()
.enumerate()
.filter_map(|(i, d)| {
d.and_then(|d| Duration::try_from(d).ok().map(|d| (i, d)))
.filter_map(|(k, d)| {
d.and_then(|d| Duration::try_from(d).ok().map(|d| (k, d)))
})
.chain(sched.map(|s| (2, s)))
.filter(|(_, d)| !d.is_zero())
.min_by(|(_, d1), (_, d2)| d1.cmp(d2));
if let Some((timeout_type, timeout_at)) = timeout_at {
let sleep_time = timeout_at + local_timeout_buffer;
let cancel_tx = cancels_tx.clone();
let resetter = if timeout_type == 0 {
let resetter = if timeout_type == HEARTBEAT_TYPE {
Some(Arc::new(Notify::new()))
} else {
None
Expand All @@ -597,6 +578,11 @@ where
} else {
tokio::time::sleep(sleep_time).await;
}
debug!(
task_token=%tt,
"Timing out activity due to elapsed local \
{timeout_type} timer"
);
let _ = cancel_tx.send(PendingActivityCancel {
task_token: tt,
reason: ActivityCancelReason::TimedOut,
Expand Down Expand Up @@ -706,7 +692,6 @@ mod tests {
use crate::{
pollers::new_activity_task_buffer, prost_dur, worker::client::mocks::mock_workflow_client,
};
use std::{ops::Sub, time::SystemTime};
use temporal_sdk_core_protos::coresdk::activity_result::ActivityExecutionResult;

#[tokio::test]
Expand Down Expand Up @@ -801,7 +786,7 @@ mod tests {
activity_id: "act1".to_string(),
start_to_close_timeout: Some(prost_dur!(from_millis(100))),
// Verify zero durations do not apply
schedule_to_close_timeout: Some(prost_dur!(from_millis(0))),
heartbeat_timeout: Some(prost_dur!(from_millis(0))),
..Default::default()
})
});
Expand Down Expand Up @@ -829,23 +814,6 @@ mod tests {
..Default::default()
})
});
mock_client
.expect_poll_activity_task()
.times(1)
.returning(move |_, _| {
Ok(PollActivityTaskQueueResponse {
task_token: vec![4],
activity_id: "act4".to_string(),
schedule_to_close_timeout: Some(prost_dur!(from_secs_f32(100.1))),
scheduled_time: Some(
SystemTime::now()
.sub(Duration::from_secs(100))
.try_into()
.unwrap(),
),
..Default::default()
})
});
let mock_client = Arc::new(mock_client);
let sem = Arc::new(MeteredSemaphore::new(
1, // Just one task at a time
Expand Down Expand Up @@ -874,7 +842,7 @@ mod tests {
Duration::from_millis(100), // Short buffer for unit test
);

for _ in 1..=4 {
for _ in 1..=3 {
let start = Instant::now();
let t = atm.poll().await.unwrap();
// Just don't do anything when we get the task, wait for the timeout to come.
Expand Down

0 comments on commit db6bff4

Please sign in to comment.