Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: subscribe to server shutdown signal in insert task #553

Merged
merged 5 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 38 additions & 10 deletions api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use recon::Key;
use swagger::{ApiError, ByteArray};
#[cfg(not(target_env = "msvc"))]
use tikv_jemalloc_ctl::epoch;
use tokio::sync::broadcast;
use tracing::{instrument, Level};

use crate::server::event::event_id_from_car;
Expand Down Expand Up @@ -362,11 +363,17 @@ where
I: InterestService,
M: EventService + 'static,
{
pub fn new(node_id: NodeId, network: Network, interest: I, model: Arc<M>) -> Self {
pub fn new(
node_id: NodeId,
network: Network,
interest: I,
model: Arc<M>,
shutdown_signal: broadcast::Receiver<()>,
) -> Self {
let (tx, event_rx) = tokio::sync::mpsc::channel::<EventInsert>(1024);
let event_store = model.clone();

let handle = Self::start_insert_task(event_store, event_rx, node_id);
let handle = Self::start_insert_task(event_store, event_rx, node_id, shutdown_signal);
let insert_task = Arc::new(InsertTask {
_handle: handle,
tx,
Expand All @@ -391,6 +398,7 @@ where
event_store: Arc<M>,
mut event_rx: tokio::sync::mpsc::Receiver<EventInsert>,
node_id: NodeId,
mut shutdown_signal: broadcast::Receiver<()>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(FLUSH_INTERVAL_MS));
Expand All @@ -399,27 +407,47 @@ where
// rely on the channel depth for backpressure. the goal is to keep the queue close to empty
// without processing one at a time. when we stop parsing the carfile in the store
// i.e. validate before sending here and this is just an insert, we may want to process more at once.
let mut shutdown = false;
loop {
let mut buf = Vec::with_capacity(EVENTS_TO_RECEIVE);
let mut process_interval = false;
tokio::select! {
_ = interval.tick() => {
Self::process_events(&mut events, &event_store, node_id).await;
process_interval = true;
}
val = event_rx.recv_many(&mut buf, EVENTS_TO_RECEIVE) => {
if val > 0 {
events.extend(buf);
}
}
}
let shutdown = event_rx.is_closed();
// make sure the events queue doesn't get too deep when we're under heavy load
if events.len() >= EVENT_INSERT_QUEUE_SIZE || shutdown {
Self::process_events(&mut events, &event_store, node_id).await;
}
_ = shutdown_signal.recv() => {
tracing::debug!("Insert many task got shutdown signal");
shutdown = true;
}
};
if shutdown {
tracing::info!("Shutting down insert task.");
tracing::info!("Shutting down insert task after processing current batch");
if !event_rx.is_empty() {
let remaining_event_cnt = event_rx.len();
// the buffer above gets moved into the select! even though we don't
// follow that branch in this case, so we create a new buffer here
let mut buf = Vec::with_capacity(remaining_event_cnt);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we not just use the buf that was already allocated on line 412?

Copy link
Contributor Author

@dav1do dav1do Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, makes sense. We can resize and use it (or just use it since recv_many might resize automatically - I’ll verify). It’s the last allocation before exiting so I didn’t think too closely about it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't use it actually. it gets moved into the select branch even though we don't follow that arm in this case. I added a comment to that effect. expanding the macro might make it easier to see but I didn't bother.

tracing::info!(
"Receiving {remaining_event_cnt} remaining events for insert task before exiting"
);
event_rx.recv_many(&mut buf, event_rx.len()).await;
events.extend(buf);
}

Self::process_events(&mut events, &event_store, node_id).await;
return;
}
// process events at the interval or when we're under heavy load.
// we do it outside the select! to avoid any cancel safety issues
// even though we should be okay since we're using tokio channels/intervals
if events.len() >= EVENT_INSERT_QUEUE_SIZE || process_interval {
Self::process_events(&mut events, &event_store, node_id).await;
}
}
})
}
Expand Down
38 changes: 27 additions & 11 deletions api/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,21 @@ pub fn mock_get_unsigned_init_event(mock_store: &mut MockEventStoreTest) {
.return_once(move |_| Ok(Some(decode_multibase_str(UNSIGNED_INIT_EVENT_PAYLOAD))));
}

/// Wrapper around server initialization that handles creating the shutdown handler
fn create_test_server<C, I, M>(
node_id: NodeId,
network: Network,
interest: I,
model: Arc<M>,
) -> Server<C, I, M>
where
I: InterestService,
M: EventService + 'static,
{
let (_, rx) = tokio::sync::broadcast::channel(1);
Server::new(node_id, network, interest, model, rx)
}

#[test(tokio::test)]
async fn create_event() {
let node_id = NodeId::random().0;
Expand Down Expand Up @@ -195,7 +210,7 @@ async fn create_event() {
.map(|v| EventInsertResult::new_ok(v.key.clone()))
.collect())
});
let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store));
let server = create_test_server(node_id, network, mock_interest, Arc::new(mock_event_store));
let resp = server
.events_post(
models::EventData {
Expand Down Expand Up @@ -241,7 +256,7 @@ async fn create_event_fails() {
})
.collect())
});
let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store));
let server = create_test_server(node_id, network, mock_interest, Arc::new(mock_event_store));
let resp = server
.events_post(
models::EventData {
Expand Down Expand Up @@ -291,7 +306,7 @@ async fn register_interest_sort_value() {
.times(1)
.returning(|_| Ok(true));
let mock_event_store = MockEventStoreTest::new();
let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store));
let server = create_test_server(node_id, network, mock_interest, Arc::new(mock_event_store));
let interest = models::Interest {
sep: "model".to_string(),
sep_value: model.to_owned(),
Expand All @@ -313,7 +328,7 @@ async fn register_interest_sort_value_bad_request() {
// Setup mock expectations
let mock_interest = MockAccessInterestStoreTest::new();
let mock_event_store = MockEventStoreTest::new();
let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store));
let server = create_test_server(node_id, network, mock_interest, Arc::new(mock_event_store));
let interest = models::Interest {
sep: "model".to_string(),
sep_value: model.to_owned(),
Expand Down Expand Up @@ -361,7 +376,8 @@ async fn register_interest_sort_value_controller() {
.times(1)
.returning(|__| Ok(true));
let mock_event_store = MockEventStoreTest::new();
let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store));

let server = create_test_server(node_id, network, mock_interest, Arc::new(mock_event_store));
let resp = server
.interests_sort_key_sort_value_post(
"model".to_string(),
Expand Down Expand Up @@ -413,7 +429,7 @@ async fn register_interest_value_controller_stream() {
.times(1)
.returning(|__| Ok(true));
let mock_event_store = MockEventStoreTest::new();
let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store));
let server = create_test_server(node_id, network, mock_interest, Arc::new(mock_event_store));
let resp = server
.interests_sort_key_sort_value_post(
"model".to_string(),
Expand Down Expand Up @@ -481,7 +497,7 @@ async fn get_interests() {
});

let mock_event_store = MockEventStoreTest::new();
let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store));
let server = create_test_server(node_id, network, mock_interest, Arc::new(mock_event_store));
let resp = server
.experimental_interests_get(None, &Context)
.await
Expand Down Expand Up @@ -569,7 +585,7 @@ async fn get_interests_for_peer() {
});

let mock_event_store = MockEventStoreTest::new();
let server = Server::new(
let server = create_test_server(
node_id_b,
network,
mock_interest,
Expand Down Expand Up @@ -635,7 +651,7 @@ async fn get_events_for_interest_range() {
)
.times(1)
.returning(move |_, _, _| Ok(vec![(cid, vec![])]));
let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store));
let server = create_test_server(node_id, network, mock_interest, Arc::new(mock_event_store));
let resp = server
.experimental_events_sep_sep_value_get(
"model".to_string(),
Expand Down Expand Up @@ -684,7 +700,7 @@ async fn test_events_event_id_get_by_event_id_success() {
.times(1)
.returning(move |_| Ok(Some(event_data.clone())));
let mock_interest = MockAccessInterestStoreTest::new();
let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store));
let server = create_test_server(node_id, network, mock_interest, Arc::new(mock_event_store));
let result = server.events_event_id_get(event_id_str, &Context).await;
let EventsEventIdGetResponse::Success(event) = result.unwrap() else {
panic!("Expected EventsEventIdGetResponse::Success but got another variant");
Expand Down Expand Up @@ -712,7 +728,7 @@ async fn test_events_event_id_get_by_cid_success() {
.times(1)
.returning(move |_| Ok(Some(event_data.clone())));
let mock_interest = MockAccessInterestStoreTest::new();
let server = Server::new(node_id, network, mock_interest, Arc::new(mock_event_store));
let server = create_test_server(node_id, network, mock_interest, Arc::new(mock_event_store));
let result = server
.events_event_id_get(event_cid.to_string(), &Context)
.await;
Expand Down
1 change: 1 addition & 0 deletions one/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ pub async fn run(opts: DaemonOpts) -> Result<()> {
network,
interest_api_store,
Arc::new(model_api_store),
shutdown_signal.resubscribe(),
);
if opts.authentication {
ceramic_server.with_authentication(true);
Expand Down
Loading