Skip to content

Commit

Permalink
Replace uuids by SlotMap keys
Browse files Browse the repository at this point in the history
  • Loading branch information
antlai-temporal committed Nov 3, 2023
1 parent cbb18bb commit 62db6cf
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 105 deletions.
1 change: 1 addition & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ opentelemetry = { workspace = true, features = ["metrics"] }
parking_lot = "0.12"
prost-types = "0.11"
rand = "0.8.3"
slotmap = "1.0"
thiserror = "1.0"
tokio = "1.1"
tonic = { workspace = true, features = ["tls", "tls-roots"] }
Expand Down
2 changes: 1 addition & 1 deletion client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub use temporal_sdk_core_protos::temporal::api::{
},
};
pub use tonic;
pub use worker_registry::{Slot, SlotManager, SlotProvider, WorkerRegistry};
pub use worker_registry::{Slot, SlotManager, SlotProvider, WorkerKey, WorkerRegistry};
pub use workflow_handle::{WorkflowExecutionInfo, WorkflowExecutionResult};

use crate::{
Expand Down
139 changes: 53 additions & 86 deletions client/src/worker_registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@
//! after reserving a slot, directly forwards a WFT to a local worker.
use parking_lot::RwLock;
use slotmap::SlotMap;
use std::collections::{hash_map::Entry::Vacant, HashMap};

use temporal_sdk_core_protos::temporal::api::workflowservice::v1::PollWorkflowTaskQueueResponse;

slotmap::new_key_type! {
/// Registration key for a worker
pub struct WorkerKey;
}

/// This trait is implemented by an object associated with a worker, which provides WFT processing slots.
#[cfg_attr(test, mockall::automock)]
pub trait SlotProvider: std::fmt::Debug {
/// A unique identifier for the worker.
fn id(&self) -> &str;
/// The namespace for the WFTs that it can process.
fn namespace(&self) -> &str;
/// The task queue this provider listens to.
Expand All @@ -35,9 +39,9 @@ pub trait Slot {
#[cfg_attr(test, mockall::automock)]
pub trait WorkerRegistry {
/// Register a local worker that can provide WFT processing slots.
fn register(&self, provider: Box<dyn SlotProvider + Send + Sync>);
fn register(&self, provider: Box<dyn SlotProvider + Send + Sync>) -> Option<WorkerKey>;
/// Unregister a provider, typically when its worker starts shutdown.
fn unregister(&self, id: &str);
fn unregister(&self, id: WorkerKey);
}

#[derive(PartialEq, Eq, Hash, Debug, Clone)]
Expand All @@ -63,15 +67,15 @@ struct SlotManagerImpl {
/// Maps keys, i.e., namespace#task_queue, to provider.
providers: HashMap<SlotKey, Box<dyn SlotProvider + Send + Sync>>,
/// Maps ids to keys in `providers`.
index: HashMap<String, SlotKey>,
index: SlotMap<WorkerKey, SlotKey>,
}

impl SlotManagerImpl {
/// Factory method.
fn new() -> Self {
Self {
index: HashMap::new(),
providers: HashMap::new(),
index: Default::default(),
providers: Default::default(),
}
}

Expand All @@ -89,23 +93,21 @@ impl SlotManagerImpl {
None
}

fn register(&mut self, provider: Box<dyn SlotProvider + Send + Sync>) {
let id = provider.id();
if let Vacant(e) = self.index.entry(id.to_string()) {
let key = SlotKey::new(
provider.namespace().to_string(),
provider.task_queue().to_string(),
);
if let Vacant(p) = self.providers.entry(key.clone()) {
p.insert(provider);
e.insert(key);
} else {
warn!("Ignoring registration for worker {id} in bucket {key:?}.");
}
fn register(&mut self, provider: Box<dyn SlotProvider + Send + Sync>) -> Option<WorkerKey> {
let key = SlotKey::new(
provider.namespace().to_string(),
provider.task_queue().to_string(),
);
if let Vacant(p) = self.providers.entry(key.clone()) {
p.insert(provider);
Some(self.index.insert(key))
} else {
warn!("Ignoring registration for worker in bucket {key:?}.");
None
}
}

fn unregister(&mut self, id: &str) {
fn unregister(&mut self, id: WorkerKey) {
if let Some(key) = self.index.remove(id) {
self.providers.remove(&key);
}
Expand Down Expand Up @@ -152,11 +154,11 @@ impl SlotManager {
}

impl WorkerRegistry for SlotManager {
fn register(&self, provider: Box<dyn SlotProvider + Send + Sync>) {
fn register(&self, provider: Box<dyn SlotProvider + Send + Sync>) -> Option<WorkerKey> {
self.manager.write().register(provider)
}

fn unregister(&self, id: &str) {
fn unregister(&self, id: WorkerKey) {
self.manager.write().unregister(id)
}
}
Expand All @@ -178,7 +180,6 @@ mod tests {
}

fn new_mock_provider(
id: String,
namespace: String,
task_queue: String,
with_error: bool,
Expand All @@ -194,32 +195,21 @@ mod tests {
Some(new_mock_slot(with_error))
}
});
mock_provider.expect_id().return_const(id);
mock_provider.expect_namespace().return_const(namespace);
mock_provider.expect_task_queue().return_const(task_queue);
mock_provider
}

#[test]
fn registry_respects_registration_order() {
let mock_provider1 = new_mock_provider(
"some_slots_id".to_string(),
"foo".to_string(),
"bar_q".to_string(),
false,
false,
);
let mock_provider2 = new_mock_provider(
"no_slots_id".to_string(),
"foo".to_string(),
"bar_q".to_string(),
false,
true,
);
let mock_provider1 =
new_mock_provider("foo".to_string(), "bar_q".to_string(), false, false);
let mock_provider2 = new_mock_provider("foo".to_string(), "bar_q".to_string(), false, true);

let manager = SlotManager::new();
manager.register(Box::new(mock_provider1));
manager.register(Box::new(mock_provider2));
let some_slots = manager.register(Box::new(mock_provider1));
let no_slots = manager.register(Box::new(mock_provider2));
assert!(no_slots.is_none());

let mut found = 0;
for _ in 0..10 {
Expand All @@ -233,26 +223,16 @@ mod tests {
assert_eq!(found, 10);
assert_eq!((1, 1), manager.num_providers());

manager.unregister("some_slots_id");
manager.unregister(some_slots.unwrap());
assert_eq!((0, 0), manager.num_providers());

let mock_provider1 = new_mock_provider(
"some_slots_id".to_string(),
"foo".to_string(),
"bar_q".to_string(),
false,
false,
);
let mock_provider2 = new_mock_provider(
"no_slots_id".to_string(),
"foo".to_string(),
"bar_q".to_string(),
false,
true,
);
let mock_provider1 =
new_mock_provider("foo".to_string(), "bar_q".to_string(), false, false);
let mock_provider2 = new_mock_provider("foo".to_string(), "bar_q".to_string(), false, true);

manager.register(Box::new(mock_provider2));
manager.register(Box::new(mock_provider1));
let no_slots = manager.register(Box::new(mock_provider2));
let some_slots = manager.register(Box::new(mock_provider1));
assert!(some_slots.is_none());

let mut not_found = 0;
for _ in 0..10 {
Expand All @@ -265,44 +245,31 @@ mod tests {
}
assert_eq!(not_found, 10);
assert_eq!((1, 1), manager.num_providers());
manager.unregister(no_slots.unwrap());
assert_eq!((0, 0), manager.num_providers());
}

#[test]
fn registry_keeps_one_provider_per_namespace() {
let manager = SlotManager::new();
let mut worker_keys = vec![];
for i in 0..10 {
let id = format!("myId{}", i);
let namespace = format!("myId{}", i % 3);
let mock_provider = new_mock_provider(id, namespace, "bar_q".to_string(), false, false);
manager.register(Box::new(mock_provider));
let mock_provider = new_mock_provider(namespace, "bar_q".to_string(), false, false);
worker_keys.push(manager.register(Box::new(mock_provider)));
}
assert_eq!((3, 3), manager.num_providers());

for i in 0..10 {
let id = format!("myId{}", i);
manager.unregister(&id);
}
assert_eq!((0, 0), manager.num_providers());
}

#[test]
fn registry_is_idempotent() {
let manager = SlotManager::new();
for _ in 0..10 {
let mock_provider = new_mock_provider(
"same_id".to_string(),
"ns".to_string(),
"bar_q".to_string(),
false,
false,
);
manager.register(Box::new(mock_provider));
}
assert_eq!((1, 1), manager.num_providers());

for _ in 0..10 {
manager.unregister("same_id");
}
let count = worker_keys
.iter()
.filter(|key| key.is_some())
.fold(0, |count, key| {
manager.unregister(key.unwrap());
// Should be idempotent
manager.unregister(key.unwrap());
count + 1
});
assert_eq!(3, count);
assert_eq!((0, 0), manager.num_providers());
}
}
17 changes: 9 additions & 8 deletions core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ pub(crate) use activities::{
};
pub(crate) use workflow::{wft_poller::new_wft_poller, LEGACY_QUERY_ID};

use temporal_client::WorkerKey;

use crate::{
abstractions::{dbg_panic, MeteredSemaphore},
errors::CompleteWfError,
Expand Down Expand Up @@ -79,9 +81,8 @@ use {
pub struct Worker {
config: WorkerConfig,
wf_client: Arc<dyn WorkerClient>,

/// A unique identifier for this worker
uuid: String,
/// Registration key for this worker
worker_key: Option<WorkerKey>,
/// Manages all workflows and WFT processing
workflows: Workflows,
/// Manages activity tasks for this worker/task queue
Expand Down Expand Up @@ -177,7 +178,9 @@ impl WorkerTrait for Worker {
}
self.shutdown_token.cancel();
// First, disable Eager Workflow Start
self.wf_client.workers().unregister(&self.uuid);
if let Some(key) = self.worker_key {
self.wf_client.workers().unregister(key);
}
// Second, we want to stop polling of both activity and workflow tasks
if let Some(atm) = self.at_task_mgr.as_ref() {
atm.initiate_shutdown();
Expand Down Expand Up @@ -362,17 +365,15 @@ impl Worker {
info!("Activity polling is disabled for this worker");
};
let la_sink = LAReqSink::new(local_act_mgr.clone(), config.wf_state_inputs.clone());
let uuid = uuid::Uuid::new_v4().simple().to_string();
let provider = SlotProvider::new(
uuid.clone(),
config.namespace.clone(),
config.task_queue.clone(),
wft_semaphore.clone(),
external_wft_tx,
);
client.workers().register(Box::new(provider));
let worker_key = client.workers().register(Box::new(provider));
Self {
uuid,
worker_key,
wf_client: client.clone(),
workflows: Workflows::new(
build_wf_basics(
Expand Down
11 changes: 1 addition & 10 deletions core/src/worker/slot_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ impl SlotTrait for Slot {
}

#[derive(derive_more::DebugCustom)]
#[debug(fmt = "SlotProvider {{ id: {id}, namespace:{namespace}, task_queue: {task_queue} }}")]
#[debug(fmt = "SlotProvider {{ namespace:{namespace}, task_queue: {task_queue} }}")]
pub struct SlotProvider {
id: String,
namespace: String,
task_queue: String,
wft_semaphore: Arc<MeteredSemaphore>,
Expand All @@ -54,14 +53,12 @@ pub struct SlotProvider {

impl SlotProvider {
pub(crate) fn new(
id: String,
namespace: String,
task_queue: String,
wft_semaphore: Arc<MeteredSemaphore>,
external_wft_tx: WFTStreamSender,
) -> Self {
Self {
id,
namespace,
task_queue,
wft_semaphore,
Expand All @@ -71,9 +68,6 @@ impl SlotProvider {
}

impl SlotProviderTrait for SlotProvider {
fn id(&self) -> &str {
&self.id
}
fn namespace(&self) -> &str {
&self.namespace
}
Expand Down Expand Up @@ -120,7 +114,6 @@ mod tests {
let (external_wft_tx, mut external_wft_rx) = unbounded_channel();

let provider = SlotProvider::new(
"my_id".to_string(),
"my_namespace".to_string(),
"my_queue".to_string(),
wft_semaphore,
Expand All @@ -146,7 +139,6 @@ mod tests {
|_, _| {},
));
let provider = SlotProvider::new(
"my_id".to_string(),
"my_namespace".to_string(),
"my_queue".to_string(),
wft_semaphore,
Expand All @@ -168,7 +160,6 @@ mod tests {
let wft_semaphore = wft_semaphore.clone();
let (external_wft_tx, _) = unbounded_channel();
let provider = SlotProvider::new(
"my_id".to_string(),
"my_namespace".to_string(),
"my_queue".to_string(),
wft_semaphore.clone(),
Expand Down

0 comments on commit 62db6cf

Please sign in to comment.