Skip to content

Commit

Permalink
New Rust lint fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Jan 8, 2024
1 parent 0effdc9 commit 1312603
Show file tree
Hide file tree
Showing 13 changed files with 37 additions and 39 deletions.
6 changes: 2 additions & 4 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,7 @@ impl WorkflowClientTrait for Client {
.and_then(|d| d.try_into().ok()),
workflow_run_timeout: options.run_timeout.and_then(|d| d.try_into().ok()),
workflow_task_timeout: options.task_timeout.and_then(|d| d.try_into().ok()),
search_attributes: options.search_attributes.and_then(|d| d.try_into().ok()),
search_attributes: options.search_attributes.map(|d| d.into()),
cron_schedule: options.cron_schedule.unwrap_or_default(),
request_eager_execution: options.enable_eager_workflow_start,
..Default::default()
Expand Down Expand Up @@ -1186,9 +1186,7 @@ impl WorkflowClientTrait for Client {
workflow_task_timeout: workflow_options
.task_timeout
.and_then(|d| d.try_into().ok()),
search_attributes: workflow_options
.search_attributes
.and_then(|d| d.try_into().ok()),
search_attributes: workflow_options.search_attributes.map(|d| d.into()),
cron_schedule: workflow_options.cron_schedule.unwrap_or_default(),
header: options.signal_header,
..Default::default()
Expand Down
2 changes: 1 addition & 1 deletion core/src/core_tests/activity_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ async fn activity_tasks_from_completion_reserve_slots() {
// Make sure when we see the completion with the schedule act command that it does
// not have the eager execution flag set the first time, and does the second.
if let Some(Attributes::ScheduleActivityTaskCommandAttributes(attrs)) =
wftc.commands.get(0).and_then(|cmd| cmd.attributes.as_ref())
wftc.commands.first().and_then(|cmd| cmd.attributes.as_ref())
{
if attrs.activity_id == "1" {
assert!(!attrs.request_eager_execution);
Expand Down
2 changes: 1 addition & 1 deletion core/src/protosext/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl Debug for ValidPollWFTQResponse {
self.previous_started_event_id,
self.started_event_id,
self.history.events.len(),
self.history.events.get(0).map(|e| e.event_id),
self.history.events.first().map(|e| e.event_id),
self.legacy_query,
self.query_requests
)
Expand Down
38 changes: 19 additions & 19 deletions core/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,25 +279,6 @@ fn metric_temporality_to_selector(
MetricTemporality::Delta => ConstantTemporality(Temporality::Delta),
}
}

#[cfg(test)]
pub mod test_initters {
use super::*;
use temporal_sdk_core_api::telemetry::TelemetryOptionsBuilder;

#[allow(dead_code)] // Not always used, called to enable for debugging when needed
pub fn test_telem_console() {
telemetry_init_global(
TelemetryOptionsBuilder::default()
.logging(Logger::Console {
filter: construct_filter_string(Level::DEBUG, Level::WARN),
})
.build()
.unwrap(),
)
.unwrap();
}
}
#[cfg(test)]
pub use test_initters::*;

Expand Down Expand Up @@ -326,3 +307,22 @@ where
format!("[{}]", self.iter().format(","))
}
}

#[cfg(test)]
pub mod test_initters {
use super::*;
use temporal_sdk_core_api::telemetry::TelemetryOptionsBuilder;

#[allow(dead_code)] // Not always used, called to enable for debugging when needed
pub fn test_telem_console() {
telemetry_init_global(
TelemetryOptionsBuilder::default()
.logging(Logger::Console {
filter: construct_filter_string(Level::DEBUG, Level::WARN),
})
.build()
.unwrap(),
)
.unwrap();
}
}
2 changes: 1 addition & 1 deletion core/src/test_help/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ pub(crate) fn build_mock_pollers(mut cfg: MockPollCfg) -> MocksHolder {
for (_, tasks) in &mut resp_iter {
// Must extract run id from a workflow task associated with this workflow
// TODO: Case where run id changes for same workflow id is not handled here
if let Some(t) = tasks.get(0) {
if let Some(t) = tasks.front() {
let rid = t.workflow_execution.as_ref().unwrap().run_id.clone();
if !outstanding.has_run(&rid) {
let t = tasks.pop_front().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion core/src/worker/workflow/history_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ impl HistoryUpdate {
self.previous_wft_started_id >= 0
}
pub fn first_event_id(&self) -> Option<i64> {
self.events.get(0).map(|e| e.event_id)
self.events.first().map(|e| e.event_id)
}

#[cfg(debug_assertions)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ impl Cancellable for SignalExternalMachine {
fn cancel(&mut self) -> Result<Vec<MachineResponse>, MachineError<Self::Error>> {
let res = OnEventWrapper::on_event_mut(self, SignalExternalMachineEvents::Cancel)?;
let mut ret = vec![];
match res.get(0) {
match res.first() {
Some(SignalExternalCommand::Cancelled) => {
ret = vec![ResolveSignalExternalWorkflow {
seq: self.shared_state.seq,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ mod tests {
// Ensure the upsert command has an empty map when not using the patched command
if !with_patched_cmd {
mp.completion_asserts = Some(Box::new(|wftc| {
assert_matches!(wftc.commands.get(0).and_then(|c| c.attributes.as_ref()).unwrap(),
assert_matches!(wftc.commands.first().and_then(|c| c.attributes.as_ref()).unwrap(),
Attributes::UpsertWorkflowSearchAttributesCommandAttributes(attrs)
if attrs.search_attributes.as_ref().unwrap().indexed_fields.is_empty())
}));
Expand Down
4 changes: 2 additions & 2 deletions core/src/worker/workflow/workflow_stream/saved_wf_inputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tokio_util::sync::CancellationToken;
///
/// Use `CoreWfStarter::enable_wf_state_input_recording` from the integration test utilities to
/// activate saving the data to disk, and use the `wf_input_replay` example binary to replay.
pub async fn replay_wf_state_inputs(mut config: WorkerConfig, inputs: impl Stream<Item = Vec<u8>>) {
pub async fn replay_wf_state_inputs(config: WorkerConfig, inputs: impl Stream<Item = Vec<u8>>) {
use crate::worker::build_wf_basics;

let la_resp_q = Arc::new(SegQueue::new());
Expand All @@ -43,7 +43,7 @@ pub async fn replay_wf_state_inputs(mut config: WorkerConfig, inputs: impl Strea
})
});
let basics = build_wf_basics(
&mut config,
config,
MetricsContext::no_op(),
CancellationToken::new(),
DEFAULT_TEST_CAPABILITIES.clone(),
Expand Down
2 changes: 1 addition & 1 deletion sdk-core-protos/src/history_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl HistoryInfo {
let mut workflow_task_started_event_id = 0;
let mut wf_task_count = 0;
let mut history = events.iter().peekable();
let started_attrs = match &events.get(0).unwrap().attributes {
let started_attrs = match &events.first().unwrap().attributes {
Some(history_event::Attributes::WorkflowExecutionStartedEventAttributes(attrs)) => {
attrs.clone()
}
Expand Down
2 changes: 1 addition & 1 deletion sdk-core-protos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ pub mod coresdk {
) -> Option<LocalActivityMarkerData> {
details
.get("data")
.and_then(|p| p.payloads.get(0))
.and_then(|p| p.payloads.first())
.and_then(|p| std::str::from_utf8(&p.data).ok())
.and_then(|s| serde_json::from_str(s).ok())
}
Expand Down
4 changes: 2 additions & 2 deletions sdk/src/workflow_future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ impl WorkflowFuture {
let defp = Payload::default();
let val_res = if u.run_validator {
match panic::catch_unwind(AssertUnwindSafe(|| {
(impls.validator)(&info, u.input.get(0).unwrap_or(&defp))
(impls.validator)(&info, u.input.first().unwrap_or(&defp))
})) {
Ok(r) => r,
Err(e) => {
Expand All @@ -257,7 +257,7 @@ impl WorkflowFuture {
wf_ctx: self.wf_ctx.clone(),
info,
},
u.input.get(0).unwrap_or(&defp),
u.input.first().unwrap_or(&defp),
);
self.update_futures
.push((u.protocol_instance_id, handler_fut));
Expand Down
8 changes: 4 additions & 4 deletions tests/integ_tests/visibility_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ async fn client_list_open_closed_workflow_executions() {

// List above OPEN workflow
let start_time_filter = StartTimeFilter {
earliest_time: Some(earliest).and_then(|t| t.try_into().ok()),
latest_time: Some(latest).and_then(|t| t.try_into().ok()),
earliest_time: Some(earliest).map(|t| t.into()),
latest_time: Some(latest).map(|t| t.into()),
};
let filter = ListOpenFilters::ExecutionFilter(WorkflowExecutionFilter {
workflow_id: wf_name.clone(),
Expand All @@ -63,8 +63,8 @@ async fn client_list_open_closed_workflow_executions() {
1,
Default::default(),
Some(StartTimeFilter {
earliest_time: Some(earliest).and_then(|t| t.try_into().ok()),
latest_time: Some(latest).and_then(|t| t.try_into().ok()),
earliest_time: Some(earliest).map(|t| t.into()),
latest_time: Some(latest).map(|t| t.into()),
}),
Some(ListClosedFilters::ExecutionFilter(
WorkflowExecutionFilter {
Expand Down

0 comments on commit 1312603

Please sign in to comment.