diff --git a/client/src/lib.rs b/client/src/lib.rs index b634c6087..622955e24 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1020,7 +1020,7 @@ impl WorkflowClientTrait for Client { .and_then(|d| d.try_into().ok()), workflow_run_timeout: options.run_timeout.and_then(|d| d.try_into().ok()), workflow_task_timeout: options.task_timeout.and_then(|d| d.try_into().ok()), - search_attributes: options.search_attributes.and_then(|d| d.try_into().ok()), + search_attributes: options.search_attributes.map(|d| d.into()), cron_schedule: options.cron_schedule.unwrap_or_default(), request_eager_execution: options.enable_eager_workflow_start, ..Default::default() @@ -1186,9 +1186,7 @@ impl WorkflowClientTrait for Client { workflow_task_timeout: workflow_options .task_timeout .and_then(|d| d.try_into().ok()), - search_attributes: workflow_options - .search_attributes - .and_then(|d| d.try_into().ok()), + search_attributes: workflow_options.search_attributes.map(|d| d.into()), cron_schedule: workflow_options.cron_schedule.unwrap_or_default(), header: options.signal_header, ..Default::default() diff --git a/core/src/core_tests/activity_tasks.rs b/core/src/core_tests/activity_tasks.rs index 6ba4239b9..bb5104d60 100644 --- a/core/src/core_tests/activity_tasks.rs +++ b/core/src/core_tests/activity_tasks.rs @@ -907,7 +907,7 @@ async fn activity_tasks_from_completion_reserve_slots() { // Make sure when we see the completion with the schedule act command that it does // not have the eager execution flag set the first time, and does the second. if let Some(Attributes::ScheduleActivityTaskCommandAttributes(attrs)) = - wftc.commands.get(0).and_then(|cmd| cmd.attributes.as_ref()) + wftc.commands.first().and_then(|cmd| cmd.attributes.as_ref()) { if attrs.activity_id == "1" { assert!(!attrs.request_eager_execution); diff --git a/core/src/protosext/mod.rs b/core/src/protosext/mod.rs index 0ea748122..730e78479 100644 --- a/core/src/protosext/mod.rs +++ b/core/src/protosext/mod.rs @@ -82,7 +82,7 @@ impl Debug for ValidPollWFTQResponse { self.previous_started_event_id, self.started_event_id, self.history.events.len(), - self.history.events.get(0).map(|e| e.event_id), + self.history.events.first().map(|e| e.event_id), self.legacy_query, self.query_requests ) diff --git a/core/src/telemetry/mod.rs b/core/src/telemetry/mod.rs index b6d87f650..9f12216f9 100644 --- a/core/src/telemetry/mod.rs +++ b/core/src/telemetry/mod.rs @@ -279,25 +279,6 @@ fn metric_temporality_to_selector( MetricTemporality::Delta => ConstantTemporality(Temporality::Delta), } } - -#[cfg(test)] -pub mod test_initters { - use super::*; - use temporal_sdk_core_api::telemetry::TelemetryOptionsBuilder; - - #[allow(dead_code)] // Not always used, called to enable for debugging when needed - pub fn test_telem_console() { - telemetry_init_global( - TelemetryOptionsBuilder::default() - .logging(Logger::Console { - filter: construct_filter_string(Level::DEBUG, Level::WARN), - }) - .build() - .unwrap(), - ) - .unwrap(); - } -} #[cfg(test)] pub use test_initters::*; @@ -326,3 +307,22 @@ where format!("[{}]", self.iter().format(",")) } } + +#[cfg(test)] +pub mod test_initters { + use super::*; + use temporal_sdk_core_api::telemetry::TelemetryOptionsBuilder; + + #[allow(dead_code)] // Not always used, called to enable for debugging when needed + pub fn test_telem_console() { + telemetry_init_global( + TelemetryOptionsBuilder::default() + .logging(Logger::Console { + filter: construct_filter_string(Level::DEBUG, Level::WARN), + }) + .build() + .unwrap(), + ) + .unwrap(); + } +} diff --git a/core/src/test_help/mod.rs b/core/src/test_help/mod.rs index 6089e15b9..4eb073fe6 100644 --- a/core/src/test_help/mod.rs +++ b/core/src/test_help/mod.rs @@ -591,7 +591,7 @@ pub(crate) fn build_mock_pollers(mut cfg: MockPollCfg) -> MocksHolder { for (_, tasks) in &mut resp_iter { // Must extract run id from a workflow task associated with this workflow // TODO: Case where run id changes for same workflow id is not handled here - if let Some(t) = tasks.get(0) { + if let Some(t) = tasks.front() { let rid = t.workflow_execution.as_ref().unwrap().run_id.clone(); if !outstanding.has_run(&rid) { let t = tasks.pop_front().unwrap(); diff --git a/core/src/worker/workflow/history_update.rs b/core/src/worker/workflow/history_update.rs index 819c8e3a2..06467a1cd 100644 --- a/core/src/worker/workflow/history_update.rs +++ b/core/src/worker/workflow/history_update.rs @@ -464,7 +464,7 @@ impl HistoryUpdate { self.previous_wft_started_id >= 0 } pub fn first_event_id(&self) -> Option { - self.events.get(0).map(|e| e.event_id) + self.events.first().map(|e| e.event_id) } #[cfg(debug_assertions)] diff --git a/core/src/worker/workflow/machines/signal_external_state_machine.rs b/core/src/worker/workflow/machines/signal_external_state_machine.rs index 59dae3c9e..5049df484 100644 --- a/core/src/worker/workflow/machines/signal_external_state_machine.rs +++ b/core/src/worker/workflow/machines/signal_external_state_machine.rs @@ -275,7 +275,7 @@ impl Cancellable for SignalExternalMachine { fn cancel(&mut self) -> Result, MachineError> { let res = OnEventWrapper::on_event_mut(self, SignalExternalMachineEvents::Cancel)?; let mut ret = vec![]; - match res.get(0) { + match res.first() { Some(SignalExternalCommand::Cancelled) => { ret = vec![ResolveSignalExternalWorkflow { seq: self.shared_state.seq, diff --git a/core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs b/core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs index 67bc5c7c2..cf15c4ba6 100644 --- a/core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs +++ b/core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs @@ -344,7 +344,7 @@ mod tests { // Ensure the upsert command has an empty map when not using the patched command if !with_patched_cmd { mp.completion_asserts = Some(Box::new(|wftc| { - assert_matches!(wftc.commands.get(0).and_then(|c| c.attributes.as_ref()).unwrap(), + assert_matches!(wftc.commands.first().and_then(|c| c.attributes.as_ref()).unwrap(), Attributes::UpsertWorkflowSearchAttributesCommandAttributes(attrs) if attrs.search_attributes.as_ref().unwrap().indexed_fields.is_empty()) })); diff --git a/core/src/worker/workflow/workflow_stream/saved_wf_inputs.rs b/core/src/worker/workflow/workflow_stream/saved_wf_inputs.rs index e455078d2..d9f035149 100644 --- a/core/src/worker/workflow/workflow_stream/saved_wf_inputs.rs +++ b/core/src/worker/workflow/workflow_stream/saved_wf_inputs.rs @@ -23,7 +23,7 @@ use tokio_util::sync::CancellationToken; /// /// Use `CoreWfStarter::enable_wf_state_input_recording` from the integration test utilities to /// activate saving the data to disk, and use the `wf_input_replay` example binary to replay. -pub async fn replay_wf_state_inputs(mut config: WorkerConfig, inputs: impl Stream>) { +pub async fn replay_wf_state_inputs(config: WorkerConfig, inputs: impl Stream>) { use crate::worker::build_wf_basics; let la_resp_q = Arc::new(SegQueue::new()); @@ -43,7 +43,7 @@ pub async fn replay_wf_state_inputs(mut config: WorkerConfig, inputs: impl Strea }) }); let basics = build_wf_basics( - &mut config, + config, MetricsContext::no_op(), CancellationToken::new(), DEFAULT_TEST_CAPABILITIES.clone(), diff --git a/sdk-core-protos/src/history_info.rs b/sdk-core-protos/src/history_info.rs index 5bdd565c7..62086cbf6 100644 --- a/sdk-core-protos/src/history_info.rs +++ b/sdk-core-protos/src/history_info.rs @@ -37,7 +37,7 @@ impl HistoryInfo { let mut workflow_task_started_event_id = 0; let mut wf_task_count = 0; let mut history = events.iter().peekable(); - let started_attrs = match &events.get(0).unwrap().attributes { + let started_attrs = match &events.first().unwrap().attributes { Some(history_event::Attributes::WorkflowExecutionStartedEventAttributes(attrs)) => { attrs.clone() } diff --git a/sdk-core-protos/src/lib.rs b/sdk-core-protos/src/lib.rs index 8d2e4d0f7..151838aae 100644 --- a/sdk-core-protos/src/lib.rs +++ b/sdk-core-protos/src/lib.rs @@ -336,7 +336,7 @@ pub mod coresdk { ) -> Option { details .get("data") - .and_then(|p| p.payloads.get(0)) + .and_then(|p| p.payloads.first()) .and_then(|p| std::str::from_utf8(&p.data).ok()) .and_then(|s| serde_json::from_str(s).ok()) } diff --git a/sdk/src/workflow_future.rs b/sdk/src/workflow_future.rs index ed1a85dc6..c5f6d97a1 100644 --- a/sdk/src/workflow_future.rs +++ b/sdk/src/workflow_future.rs @@ -236,7 +236,7 @@ impl WorkflowFuture { let defp = Payload::default(); let val_res = if u.run_validator { match panic::catch_unwind(AssertUnwindSafe(|| { - (impls.validator)(&info, u.input.get(0).unwrap_or(&defp)) + (impls.validator)(&info, u.input.first().unwrap_or(&defp)) })) { Ok(r) => r, Err(e) => { @@ -257,7 +257,7 @@ impl WorkflowFuture { wf_ctx: self.wf_ctx.clone(), info, }, - u.input.get(0).unwrap_or(&defp), + u.input.first().unwrap_or(&defp), ); self.update_futures .push((u.protocol_instance_id, handler_fut)); diff --git a/tests/integ_tests/visibility_tests.rs b/tests/integ_tests/visibility_tests.rs index c7840c8d7..4df85ef6f 100644 --- a/tests/integ_tests/visibility_tests.rs +++ b/tests/integ_tests/visibility_tests.rs @@ -35,8 +35,8 @@ async fn client_list_open_closed_workflow_executions() { // List above OPEN workflow let start_time_filter = StartTimeFilter { - earliest_time: Some(earliest).and_then(|t| t.try_into().ok()), - latest_time: Some(latest).and_then(|t| t.try_into().ok()), + earliest_time: Some(earliest).map(|t| t.into()), + latest_time: Some(latest).map(|t| t.into()), }; let filter = ListOpenFilters::ExecutionFilter(WorkflowExecutionFilter { workflow_id: wf_name.clone(), @@ -63,8 +63,8 @@ async fn client_list_open_closed_workflow_executions() { 1, Default::default(), Some(StartTimeFilter { - earliest_time: Some(earliest).and_then(|t| t.try_into().ok()), - latest_time: Some(latest).and_then(|t| t.try_into().ok()), + earliest_time: Some(earliest).map(|t| t.into()), + latest_time: Some(latest).map(|t| t.into()), }), Some(ListClosedFilters::ExecutionFilter( WorkflowExecutionFilter {