Skip to content

Commit

Permalink
Merge branch 'master' into cancel-child-wf
Browse files Browse the repository at this point in the history
  • Loading branch information
mjameswh authored Dec 3, 2024
2 parents 5f6d006 + b207660 commit 06c326b
Show file tree
Hide file tree
Showing 49 changed files with 759 additions and 507 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ license-file = "LICENSE.txt"
[workspace.dependencies]
derive_builder = "0.20"
derive_more = { version = "1.0", features = ["constructor", "display", "from", "into", "debug"] }
thiserror = "2"
tonic = "0.12"
tonic-build = "0.12"
opentelemetry = { version = "0.24", features = ["metrics"] }
opentelemetry = { version = "0.26", features = ["metrics"] }
prost = "0.13"
prost-types = "0.13"

Expand Down
2 changes: 1 addition & 1 deletion client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ opentelemetry = { workspace = true, features = ["metrics"], optional = true }
parking_lot = "0.12"
prost-types = { workspace = true }
slotmap = "1.0"
thiserror = "1.0"
thiserror = { workspace = true }
tokio = "1.1"
tonic = { workspace = true, features = ["tls", "tls-roots"] }
tower = { version = "0.5", features = ["util"] }
Expand Down
4 changes: 2 additions & 2 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub use crate::{
proxy::HttpConnectProxyOptions,
retry::{CallType, RetryClient, RETRYABLE_ERROR_CODES},
};
pub use metrics::{LONG_REQUEST_LATENCY_HISTOGRAM_NAME, REQUEST_LATENCY_HISTOGRAM_NAME};
pub use raw::{CloudService, HealthService, OperatorService, TestService, WorkflowService};
pub use temporal_sdk_core_protos::temporal::api::{
enums::v1::ArchivalState,
Expand All @@ -42,13 +43,12 @@ use crate::{
use backoff::{exponential, ExponentialBackoff, SystemClock};
use http::{uri::InvalidUri, Uri};
use parking_lot::RwLock;
use std::sync::OnceLock;
use std::{
collections::HashMap,
fmt::{Debug, Formatter},
ops::{Deref, DerefMut},
str::FromStr,
sync::Arc,
sync::{Arc, OnceLock},
time::{Duration, Instant},
};
use temporal_sdk_core_api::telemetry::metrics::TemporalMeter;
Expand Down
9 changes: 7 additions & 2 deletions client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ use temporal_sdk_core_api::telemetry::metrics::{
use tonic::{body::BoxBody, transport::Channel, Code};
use tower::Service;

/// The string name (which may be prefixed) for this metric
pub static REQUEST_LATENCY_HISTOGRAM_NAME: &str = "request_latency";
/// The string name (which may be prefixed) for this metric
pub static LONG_REQUEST_LATENCY_HISTOGRAM_NAME: &str = "long_request_latency";

/// Used to track context associated with metrics, and record/update them
// Possible improvement: make generic over some type tag so that methods are only exposed if the
// appropriate k/vs have already been set.
Expand Down Expand Up @@ -58,12 +63,12 @@ impl MetricsContext {
unit: "".into(),
}),
svc_request_latency: meter.histogram_duration(MetricParameters {
name: "request_latency".into(),
name: REQUEST_LATENCY_HISTOGRAM_NAME.into(),
unit: "duration".into(),
description: "Histogram of client request latencies".into(),
}),
long_svc_request_latency: meter.histogram_duration(MetricParameters {
name: "long_request_latency".into(),
name: LONG_REQUEST_LATENCY_HISTOGRAM_NAME.into(),
unit: "duration".into(),
description: "Histogram of client long-poll request latencies".into(),
}),
Expand Down
2 changes: 1 addition & 1 deletion core-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ opentelemetry = { workspace = true, optional = true }
prost = { workspace = true }
prost-types = { workspace = true }
serde_json = "1.0"
thiserror = "1.0"
thiserror = { workspace = true }
tonic = { workspace = true }
tracing-core = "0.1"
url = "2.3"
Expand Down
28 changes: 25 additions & 3 deletions core-api/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ pub struct OtelCollectorOptions {
/// If set to true, use f64 seconds for durations instead of u64 milliseconds
#[builder(default)]
pub use_seconds_for_durations: bool,
/// Overrides for histogram buckets. Units depend on the value of `use_seconds_for_durations`.
#[builder(default)]
pub histogram_bucket_overrides: HistogramBucketOverrides,
}

/// Options for exporting metrics to Prometheus
Expand All @@ -78,15 +81,33 @@ pub struct PrometheusExporterOptions {
#[builder(default)]
pub global_tags: HashMap<String, String>,
/// If set true, all counters will include a "_total" suffix
#[builder(default = "false")]
#[builder(default)]
pub counters_total_suffix: bool,
/// If set true, all histograms will include the unit in their name as a suffix.
/// Ex: "_milliseconds".
#[builder(default = "false")]
#[builder(default)]
pub unit_suffix: bool,
/// If set to true, use f64 seconds for durations instead of u64 milliseconds
#[builder(default)]
pub use_seconds_for_durations: bool,
/// Overrides for histogram buckets. Units depend on the value of `use_seconds_for_durations`.
#[builder(default)]
pub histogram_bucket_overrides: HistogramBucketOverrides,
}

/// Allows overriding the buckets used by histogram metrics
#[derive(Debug, Clone, Default)]
pub struct HistogramBucketOverrides {
/// Overrides where the key is the metric name and the value is the list of bucket boundaries.
/// The metric name will apply regardless of name prefixing, if any. IE: the name acts like
/// `*metric_name`.
///
/// The string names of core's built-in histogram metrics are publicly available on the
/// `core::telemetry` module and the `client` crate.
///
/// See [here](https://docs.rs/opentelemetry_sdk/latest/opentelemetry_sdk/metrics/enum.Aggregation.html#variant.ExplicitBucketHistogram.field.boundaries)
/// for the exact meaning of boundaries.
pub overrides: HashMap<String, Vec<f64>>,
}

/// Control where logs go
Expand All @@ -102,7 +123,8 @@ pub enum Logger {
/// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string.
filter: String,
},
// Push logs to Lang. Can used with temporal_sdk_core::telemetry::CoreLogBufferedConsumer to buffer.
/// Push logs to Lang. Can be used with
/// temporal_sdk_core::telemetry::log_export::CoreLogBufferedConsumer to buffer.
Push {
/// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string.
filter: String,
Expand Down
26 changes: 26 additions & 0 deletions core-api/src/telemetry/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,32 @@ mod otel_impls {
}
}

impl Gauge for metrics::Gauge<u64> {
fn record(&self, value: u64, attributes: &MetricAttributes) {
if let MetricAttributes::OTel { kvs } = attributes {
self.record(value, kvs);
} else {
debug_assert!(
false,
"Must use OTel attributes with an OTel metric implementation"
);
}
}
}

impl GaugeF64 for metrics::Gauge<f64> {
fn record(&self, value: f64, attributes: &MetricAttributes) {
if let MetricAttributes::OTel { kvs } = attributes {
self.record(value, kvs);
} else {
debug_assert!(
false,
"Must use OTel attributes with an OTel metric implementation"
);
}
}
}

impl Histogram for metrics::Histogram<u64> {
fn record(&self, value: u64, attributes: &MetricAttributes) {
if let MetricAttributes::OTel { kvs } = attributes {
Expand Down
2 changes: 2 additions & 0 deletions core-api/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ pub struct WorkerConfig {
/// server-side. Note that this only takes effect upon an activity poll request. If multiple
/// workers on the same queue have different values set, they will thrash with the last poller
/// winning.
///
/// Setting this to a nonzero value will also disable eager activity execution.
#[builder(default)]
pub max_task_queue_activities_per_second: Option<f64>,

Expand Down
8 changes: 4 additions & 4 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ itertools = "0.13"
lru = "0.12"
mockall = "0.13"
opentelemetry = { workspace = true, features = ["metrics"], optional = true }
opentelemetry_sdk = { version = "0.24", features = ["rt-tokio", "metrics"], optional = true }
opentelemetry-otlp = { version = "0.17", features = ["tokio", "metrics", "tls"], optional = true }
opentelemetry-prometheus = { version = "0.17", optional = true }
opentelemetry_sdk = { version = "0.26", features = ["rt-tokio", "metrics"], optional = true }
opentelemetry-otlp = { version = "0.26", features = ["tokio", "metrics", "tls"], optional = true }
opentelemetry-prometheus = { git = "https://github.com/open-telemetry/opentelemetry-rust.git", rev = "e911383", optional = true }
parking_lot = { version = "0.12", features = ["send_guard"] }
pid = "4.0"
pin-project = "1.0"
Expand All @@ -61,7 +61,7 @@ siphasher = "1.0"
slotmap = "1.0"
sysinfo = { version = "0.32", default-features = false, features = ["system"] }
tar = { version = "0.4", optional = true }
thiserror = "1.0"
thiserror = { workspace = true }
tokio = { version = "1.37", features = ["rt", "rt-multi-thread", "parking_lot", "time", "fs", "process"] }
tokio-util = { version = "0.7", features = ["io", "io-util"] }
tokio-stream = "0.1"
Expand Down
82 changes: 66 additions & 16 deletions core/src/core_tests/activity_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use crate::{
advance_fut, job_assert, prost_dur,
test_help::{
build_fake_worker, build_mock_pollers, canned_histories, gen_assert_and_reply,
mock_manual_poller, mock_poller, mock_poller_from_resps, mock_worker, poll_and_reply,
single_hist_mock_sg, test_worker_cfg, MockPollCfg, MockWorkerInputs, MocksHolder,
QueueResponse, ResponseType, WorkerExt, WorkflowCachingPolicy, TEST_Q,
mock_manual_poller, mock_poller, mock_poller_from_resps, mock_sdk_cfg, mock_worker,
poll_and_reply, single_hist_mock_sg, test_worker_cfg, MockPollCfg, MockWorkerInputs,
MocksHolder, QueueResponse, ResponseType, WorkerExt, WorkflowCachingPolicy, TEST_Q,
},
worker::client::mocks::{mock_manual_workflow_client, mock_workflow_client},
ActivityHeartbeat, Worker,
Expand Down Expand Up @@ -45,17 +45,18 @@ use temporal_sdk_core_protos::{
},
temporal::api::{
command::v1::{command::Attributes, ScheduleActivityTaskCommandAttributes},
enums::v1::EventType,
enums::v1::{CommandType, EventType},
history::v1::{
history_event::Attributes as EventAttributes, ActivityTaskScheduledEventAttributes,
},
sdk::v1::UserMetadata,
workflowservice::v1::{
PollActivityTaskQueueResponse, RecordActivityTaskHeartbeatResponse,
RespondActivityTaskCanceledResponse, RespondActivityTaskCompletedResponse,
RespondActivityTaskFailedResponse, RespondWorkflowTaskCompletedResponse,
},
},
TestHistoryBuilder, DEFAULT_WORKFLOW_TYPE,
TestHistoryBuilder, DEFAULT_ACTIVITY_TYPE, DEFAULT_WORKFLOW_TYPE,
};
use temporal_sdk_core_test_utils::{fanout_tasks, start_timer_cmd, TestWorker};
use tokio::{join, sync::Barrier, time::sleep};
Expand Down Expand Up @@ -633,11 +634,11 @@ async fn max_tq_acts_set_passed_to_poll_properly() {
worker.poll_activity_task().await.unwrap();
}

/// This test doesn't test the real worker config since [mock_worker] bypasses the worker
/// constructor, [mock_worker] will not pass an activity poller to the worker when
/// `no_remote_activities` is set to `true`.
#[rstest::rstest]
#[tokio::test]
async fn no_eager_activities_requested_when_worker_options_disable_remote_activities() {
async fn no_eager_activities_requested_when_worker_options_disable_it(
#[values("no_remote", "throttle")] reason: &'static str,
) {
let wfid = "fake_wf_id";
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
Expand All @@ -648,7 +649,6 @@ async fn no_eager_activities_requested_when_worker_options_disable_remote_activi
t.add_full_wf_task();
t.add_workflow_execution_completed();
let num_eager_requested = Arc::new(AtomicUsize::new(0));
// Clone it to move into the callback below
let num_eager_requested_clone = num_eager_requested.clone();

let mut mock = mock_workflow_client();
Expand Down Expand Up @@ -677,14 +677,13 @@ async fn no_eager_activities_requested_when_worker_options_disable_remote_activi
})
});
let mut mock = single_hist_mock_sg(wfid, t, [1], mock, true);
let mut mock_poller = mock_manual_poller();
mock_poller
.expect_poll()
.returning(|| futures_util::future::pending().boxed());
mock.set_act_poller(Box::new(mock_poller));
mock.worker_cfg(|wc| {
wc.max_cached_workflows = 2;
wc.no_remote_activities = true;
if reason == "no_remote" {
wc.no_remote_activities = true;
} else {
wc.max_task_queue_activities_per_second = Some(1.0);
}
});
let core = mock_worker(mock);

Expand Down Expand Up @@ -1187,3 +1186,54 @@ async fn activities_must_be_flushed_to_server_on_shutdown(#[values(true, false)]
};
join!(shutdown_task, complete_task);
}

#[tokio::test]
async fn pass_activity_summary_to_metadata() {
let t = canned_histories::single_activity("1");
let mut mock_cfg = MockPollCfg::from_hist_builder(t);
let wf_id = mock_cfg.hists[0].wf_id.clone();
let wf_type = DEFAULT_WORKFLOW_TYPE;
let expected_user_metadata = Some(UserMetadata {
summary: Some(b"activity summary".into()),
details: None,
});
mock_cfg.completion_asserts_from_expectations(|mut asserts| {
asserts
.then(move |wft| {
assert_eq!(wft.commands.len(), 1);
assert_eq!(
wft.commands[0].command_type(),
CommandType::ScheduleActivityTask
);
assert_eq!(wft.commands[0].user_metadata, expected_user_metadata)
})
.then(move |wft| {
assert_eq!(wft.commands.len(), 1);
assert_eq!(
wft.commands[0].command_type(),
CommandType::CompleteWorkflowExecution
);
});
});

let mut worker = mock_sdk_cfg(mock_cfg, |_| {});
worker.register_wf(wf_type, |ctx: WfContext| async move {
ctx.activity(ActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
summary: Some("activity summary".to_string()),
..Default::default()
})
.await;
Ok(().into())
});
worker
.submit_wf(
wf_id.to_owned(),
wf_type.to_owned(),
vec![],
WorkflowOptions::default(),
)
.await
.unwrap();
worker.run_until_done().await.unwrap();
}
Loading

0 comments on commit 06c326b

Please sign in to comment.