From c949218e2af8862aae234781ae8cf385dc6004a0 Mon Sep 17 00:00:00 2001 From: David Estes Date: Mon, 7 Oct 2024 13:43:19 -0600 Subject: [PATCH 1/5] fix: subscribe to server shutdown signal in insert task previously relied on http server shutdown closing channel exclusively, now we listen to the shutdown signal internally --- api/src/server.rs | 37 +++++++++++++++++++++++++------------ one/src/daemon.rs | 1 + 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/api/src/server.rs b/api/src/server.rs index cb2ab698..8b3ebd98 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -40,6 +40,7 @@ use recon::Key; use swagger::{ApiError, ByteArray}; #[cfg(not(target_env = "msvc"))] use tikv_jemalloc_ctl::epoch; +use tokio::sync::broadcast; use tracing::{instrument, Level}; use crate::server::event::event_id_from_car; @@ -362,11 +363,17 @@ where I: InterestService, M: EventService + 'static, { - pub fn new(node_id: NodeId, network: Network, interest: I, model: Arc) -> Self { + pub fn new( + node_id: NodeId, + network: Network, + interest: I, + model: Arc, + shutdown_signal: broadcast::Receiver<()>, + ) -> Self { let (tx, event_rx) = tokio::sync::mpsc::channel::(1024); let event_store = model.clone(); - let handle = Self::start_insert_task(event_store, event_rx, node_id); + let handle = Self::start_insert_task(event_store, event_rx, node_id, shutdown_signal); let insert_task = Arc::new(InsertTask { _handle: handle, tx, @@ -391,6 +398,7 @@ where event_store: Arc, mut event_rx: tokio::sync::mpsc::Receiver, node_id: NodeId, + mut shutdown_signal: broadcast::Receiver<()>, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_millis(FLUSH_INTERVAL_MS)); @@ -399,7 +407,17 @@ where // rely on the channel depth for backpressure. the goal is to keep the queue close to empty // without processing one at a time. when we stop parsing the carfile in the store // i.e. validate before sending here and this is just an insert, we may want to process more at once. + let mut shutdown = false; loop { + let should_exit = shutdown || event_rx.is_closed(); + // make sure the events queue doesn't get too deep when we're under heavy load + if events.len() >= EVENT_INSERT_QUEUE_SIZE || should_exit { + Self::process_events(&mut events, &event_store, node_id).await; + } + if should_exit { + tracing::info!("Shutting down insert task."); + return; + } let mut buf = Vec::with_capacity(EVENTS_TO_RECEIVE); tokio::select! { _ = interval.tick() => { @@ -410,16 +428,11 @@ where events.extend(buf); } } - } - let shutdown = event_rx.is_closed(); - // make sure the events queue doesn't get too deep when we're under heavy load - if events.len() >= EVENT_INSERT_QUEUE_SIZE || shutdown { - Self::process_events(&mut events, &event_store, node_id).await; - } - if shutdown { - tracing::info!("Shutting down insert task."); - return; - } + _ = shutdown_signal.recv() => { + tracing::debug!("Insert many task got shutdown signal"); + shutdown = true; + } + }; } }) } diff --git a/one/src/daemon.rs b/one/src/daemon.rs index 7b39c0bc..15cfd37b 100644 --- a/one/src/daemon.rs +++ b/one/src/daemon.rs @@ -559,6 +559,7 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { network, interest_api_store, Arc::new(model_api_store), + shutdown_signal.resubscribe(), ); if opts.authentication { ceramic_server.with_authentication(true); From 276f360a3ebd5c6c547cd336f95ef31f8c2be42a Mon Sep 17 00:00:00 2001 From: David Estes Date: Mon, 7 Oct 2024 15:19:02 -0600 Subject: [PATCH 2/5] chore: fix tests that need shutdown handler to compile --- api/src/tests.rs | 38 +++++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/api/src/tests.rs b/api/src/tests.rs index 3b387d37..7f2e7946 100644 --- a/api/src/tests.rs +++ b/api/src/tests.rs @@ -166,6 +166,21 @@ pub fn mock_get_unsigned_init_event(mock_store: &mut MockEventStoreTest) { .return_once(move |_| Ok(Some(decode_multibase_str(UNSIGNED_INIT_EVENT_PAYLOAD)))); } +/// Wrapper around server initialization that handles creating the shutdown handler +fn create_test_server( + node_id: NodeId, + network: Network, + interest: I, + model: Arc, +) -> Server +where + I: InterestService, + M: EventService + 'static, +{ + let (_, rx) = tokio::sync::broadcast::channel(1); + Server::new(node_id, network, interest, model, rx) +} + #[test(tokio::test)] async fn create_event() { let node_id = NodeId::random().0; @@ -195,7 +210,7 @@ async fn create_event() { .map(|v| EventInsertResult::new_ok(v.key.clone())) .collect()) }); - let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store)); + let server = create_test_server(node_id, network, mock_interest, Arc::new(mock_event_store)); let resp = server .events_post( models::EventData { @@ -241,7 +256,7 @@ async fn create_event_fails() { }) .collect()) }); - let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store)); + let server = create_test_server(node_id, network, mock_interest, Arc::new(mock_event_store)); let resp = server .events_post( models::EventData { @@ -291,7 +306,7 @@ async fn register_interest_sort_value() { .times(1) .returning(|_| Ok(true)); let mock_event_store = MockEventStoreTest::new(); - let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store)); + let server = create_test_server(node_id, network, mock_interest, Arc::new(mock_event_store)); let interest = models::Interest { sep: "model".to_string(), sep_value: model.to_owned(), @@ -313,7 +328,7 @@ async fn register_interest_sort_value_bad_request() { // Setup mock expectations let mock_interest = MockAccessInterestStoreTest::new(); let mock_event_store = MockEventStoreTest::new(); - let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store)); + let server = create_test_server(node_id, network, mock_interest, Arc::new(mock_event_store)); let interest = models::Interest { sep: "model".to_string(), sep_value: model.to_owned(), @@ -361,7 +376,8 @@ async fn register_interest_sort_value_controller() { .times(1) .returning(|__| Ok(true)); let mock_event_store = MockEventStoreTest::new(); - let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store)); + + let server = create_test_server(node_id, network, mock_interest, Arc::new(mock_event_store)); let resp = server .interests_sort_key_sort_value_post( "model".to_string(), @@ -413,7 +429,7 @@ async fn register_interest_value_controller_stream() { .times(1) .returning(|__| Ok(true)); let mock_event_store = MockEventStoreTest::new(); - let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store)); + let server = create_test_server(node_id, network, mock_interest, Arc::new(mock_event_store)); let resp = server .interests_sort_key_sort_value_post( "model".to_string(), @@ -481,7 +497,7 @@ async fn get_interests() { }); let mock_event_store = MockEventStoreTest::new(); - let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store)); + let server = create_test_server(node_id, network, mock_interest, Arc::new(mock_event_store)); let resp = server .experimental_interests_get(None, &Context) .await @@ -569,7 +585,7 @@ async fn get_interests_for_peer() { }); let mock_event_store = MockEventStoreTest::new(); - let server = Server::new( + let server = create_test_server( node_id_b, network, mock_interest, @@ -635,7 +651,7 @@ async fn get_events_for_interest_range() { ) .times(1) .returning(move |_, _, _| Ok(vec![(cid, vec![])])); - let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store)); + let server = create_test_server(node_id, network, mock_interest, Arc::new(mock_event_store)); let resp = server .experimental_events_sep_sep_value_get( "model".to_string(), @@ -684,7 +700,7 @@ async fn test_events_event_id_get_by_event_id_success() { .times(1) .returning(move |_| Ok(Some(event_data.clone()))); let mock_interest = MockAccessInterestStoreTest::new(); - let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store)); + let server = create_test_server(node_id, network, mock_interest, Arc::new(mock_event_store)); let result = server.events_event_id_get(event_id_str, &Context).await; let EventsEventIdGetResponse::Success(event) = result.unwrap() else { panic!("Expected EventsEventIdGetResponse::Success but got another variant"); @@ -712,7 +728,7 @@ async fn test_events_event_id_get_by_cid_success() { .times(1) .returning(move |_| Ok(Some(event_data.clone()))); let mock_interest = MockAccessInterestStoreTest::new(); - let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store)); + let server = create_test_server(node_id, network, mock_interest, Arc::new(mock_event_store)); let result = server .events_event_id_get(event_cid.to_string(), &Context) .await; From c90f9c00d0baea23b4bc52ca67714794a301583c Mon Sep 17 00:00:00 2001 From: David Estes Date: Mon, 7 Oct 2024 15:51:43 -0600 Subject: [PATCH 3/5] refactor: adjust insert task loop Moves all shutdown logic to one branch, which now tries to pull any pending events from the channel before exiting rather than use channel openness as proxy for shutdown. The select! branches no longer do any async logic and just set variables for the outer loop to use, as it avoids any subtle cancel safety issues with &mut self. --- api/src/server.rs | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/api/src/server.rs b/api/src/server.rs index 8b3ebd98..9b532281 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -407,21 +407,19 @@ where // rely on the channel depth for backpressure. the goal is to keep the queue close to empty // without processing one at a time. when we stop parsing the carfile in the store // i.e. validate before sending here and this is just an insert, we may want to process more at once. - let mut shutdown = false; + let (mut shutdown, mut process_early) = (false, false); loop { - let should_exit = shutdown || event_rx.is_closed(); - // make sure the events queue doesn't get too deep when we're under heavy load - if events.len() >= EVENT_INSERT_QUEUE_SIZE || should_exit { + // process events at the interval or when we're under heavy load. + // we do it outside the select! to avoid any cancel safety issues + // even though we should be okay since we're using tokio channels/intervals + if events.len() >= EVENT_INSERT_QUEUE_SIZE || process_early { Self::process_events(&mut events, &event_store, node_id).await; } - if should_exit { - tracing::info!("Shutting down insert task."); - return; - } + let mut buf = Vec::with_capacity(EVENTS_TO_RECEIVE); tokio::select! { _ = interval.tick() => { - Self::process_events(&mut events, &event_store, node_id).await; + process_early = true; } val = event_rx.recv_many(&mut buf, EVENTS_TO_RECEIVE) => { if val > 0 { @@ -433,6 +431,21 @@ where shutdown = true; } }; + if shutdown { + tracing::info!("Shutting down insert task after processing current batch"); + if !event_rx.is_empty() { + let remaining_event_cnt = event_rx.len(); + let mut buf = Vec::with_capacity(remaining_event_cnt); + tracing::info!( + "Receiving {remaining_event_cnt} remaining events for insert task before exiting" + ); + event_rx.recv_many(&mut buf, event_rx.len()).await; + events.extend(buf); + } + + Self::process_events(&mut events, &event_store, node_id).await; + return; + } } }) } From ed7e64869d04f0934f42877b02d3cc14dc83a2de Mon Sep 17 00:00:00 2001 From: David Estes Date: Mon, 7 Oct 2024 16:01:28 -0600 Subject: [PATCH 4/5] fix: reset process_early every loop instead moved all processing after select! so the loop is now 1) wait for something to happen (interval, new events, shutdown) and 2) handle shutdown or process events if appropriate 3) repeat loop --- api/src/server.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/api/src/server.rs b/api/src/server.rs index 9b532281..45ad1b44 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -407,16 +407,10 @@ where // rely on the channel depth for backpressure. the goal is to keep the queue close to empty // without processing one at a time. when we stop parsing the carfile in the store // i.e. validate before sending here and this is just an insert, we may want to process more at once. - let (mut shutdown, mut process_early) = (false, false); + let mut shutdown = false; loop { - // process events at the interval or when we're under heavy load. - // we do it outside the select! to avoid any cancel safety issues - // even though we should be okay since we're using tokio channels/intervals - if events.len() >= EVENT_INSERT_QUEUE_SIZE || process_early { - Self::process_events(&mut events, &event_store, node_id).await; - } - let mut buf = Vec::with_capacity(EVENTS_TO_RECEIVE); + let mut process_early = false; tokio::select! { _ = interval.tick() => { process_early = true; @@ -446,6 +440,12 @@ where Self::process_events(&mut events, &event_store, node_id).await; return; } + // process events at the interval or when we're under heavy load. + // we do it outside the select! to avoid any cancel safety issues + // even though we should be okay since we're using tokio channels/intervals + if events.len() >= EVENT_INSERT_QUEUE_SIZE || process_early { + Self::process_events(&mut events, &event_store, node_id).await; + } } }) } From 4ddf43f20e434e22aca9a78430ad6d4bfbb94781 Mon Sep 17 00:00:00 2001 From: David Estes Date: Wed, 9 Oct 2024 17:40:11 -0600 Subject: [PATCH 5/5] chore: pr comments --- api/src/server.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/api/src/server.rs b/api/src/server.rs index 45ad1b44..ca0dd25f 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -410,10 +410,10 @@ where let mut shutdown = false; loop { let mut buf = Vec::with_capacity(EVENTS_TO_RECEIVE); - let mut process_early = false; + let mut process_interval = false; tokio::select! { _ = interval.tick() => { - process_early = true; + process_interval = true; } val = event_rx.recv_many(&mut buf, EVENTS_TO_RECEIVE) => { if val > 0 { @@ -429,6 +429,8 @@ where tracing::info!("Shutting down insert task after processing current batch"); if !event_rx.is_empty() { let remaining_event_cnt = event_rx.len(); + // the buffer above gets moved into the select! even though we don't + // follow that branch in this case, so we create a new buffer here let mut buf = Vec::with_capacity(remaining_event_cnt); tracing::info!( "Receiving {remaining_event_cnt} remaining events for insert task before exiting" @@ -443,7 +445,7 @@ where // process events at the interval or when we're under heavy load. // we do it outside the select! to avoid any cancel safety issues // even though we should be okay since we're using tokio channels/intervals - if events.len() >= EVENT_INSERT_QUEUE_SIZE || process_early { + if events.len() >= EVENT_INSERT_QUEUE_SIZE || process_interval { Self::process_events(&mut events, &event_store, node_id).await; } }