Skip to content

Commit

Permalink
fix: handle double write of same data (#613)
Browse files Browse the repository at this point in the history
Prior to this change a write of the same data (keyed by the event id)
would cause an error the batching logic of the write path. It was
assumed that double writes would not happen and so the channel to report
the insert result was dropped, causing an error.

Now all channels are preserved and the result is sent to all.
  • Loading branch information
nathanielc authored Nov 21, 2024
1 parent 75fbb01 commit 7160752
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 10 deletions.
36 changes: 27 additions & 9 deletions api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,13 +469,17 @@ where
}

async fn process_events(events: &mut Vec<EventInsert>, event_store: &Arc<M>, node_id: NodeId) {
tracing::debug!(count = events.len(), "process_events");
if events.is_empty() {
return;
}
let mut oneshots = HashMap::with_capacity(events.len());
let mut items = Vec::with_capacity(events.len());
events.drain(..).for_each(|req: EventInsert| {
oneshots.insert(req.id.to_bytes(), req.tx);
oneshots
.entry(req.id.clone())
.or_insert(vec![])
.push(req.tx);
items.push(ApiItem::new(req.id, req.data));
});
tracing::trace!("calling insert many with {} items.", items.len());
Expand All @@ -484,12 +488,20 @@ where
tracing::debug!("insert many returned {} results.", results.len());
for result in results {
let id = result.id();
if let Some(tx) = oneshots.remove(&id.to_bytes()) {
if let Err(e) = tx.send(Ok(result)) {
if let Some(txs) = oneshots.get_mut(id) {
// Expect one result per oneshot channel
if let Some(tx) = txs.pop() {
if let Err(e) = tx.send(Ok(result.clone())) {
tracing::warn!(
"failed to send success response to api listener: {:?}",
e
);
}
} else {
tracing::warn!(
"failed to send success response to api listener: {:?}",
e
);
"no more channels to respond to API listener for duplicate event ID: {:?}",
id
);
}
} else {
tracing::warn!(
Expand All @@ -501,9 +513,15 @@ where
}
Err(e) => {
tracing::warn!("failed to insert events: {e}");
for tx in oneshots.into_values() {
if let Err(e) = tx.send(Err(anyhow::anyhow!("Failed to insert event: {e}"))) {
tracing::warn!("failed to send failed response to api listener: {:?}", e);
for txs in oneshots.into_values() {
for tx in txs {
if let Err(e) = tx.send(Err(anyhow::anyhow!("Failed to insert event: {e}")))
{
tracing::warn!(
"failed to send failed response to api listener: {:?}",
e
);
}
}
}
}
Expand Down
59 changes: 59 additions & 0 deletions api/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use mockall::{mock, predicate};
use multibase::Base;
use recon::Key;
use test_log::test;
use tokio::join;

struct Context;

Expand Down Expand Up @@ -61,6 +62,9 @@ pub const SIGNED_INIT_EVENT_PAYLOAD: &str = "uomRkYXRhoWVzdGVwaBkBTWZoZWFkZXKkY3
pub const UNSIGNED_INIT_EVENT_CID: &str =
"bafyreiakimdaub7m6inx2nljypdhvhu5vozjhylqukif4hjxt65qnkv6my";

// Assumes mainnet network
pub const UNSIGNED_INIT_EVENT_ID: &str = "CE010500C703887C2B8374ED63A8EB5B47190F4706AABE66017112200A43060A07ECF21B7D3569C3C67A9E9DABB293E170A2905E1D379FBB06AABE66";

pub const UNSIGNED_INIT_EVENT_CAR: &str = "
uOqJlcm9vdHOB2CpYJQABcRIgCkMGCgfs8ht9NWnDxnqenauyk-FwopBeHTefuwaqvmZndmVyc2lvbgHDAQFxEiAKQwYKB-zyG301acPGep6dq7KT4XCikF4dN5-7Bqq-ZqJkZGF0YfZmaGVhZGVypGNzZXBlbW9kZWxlbW9kZWxYKM4BAgGFARIghHTHRYxxeQXgc9Q6LUJVelzW5bnrw9TWgoBJlBIOVtdmdW5pcXVlR2Zvb3xiYXJrY29udHJvbGxlcnOBeDhkaWQ6a2V5Ono2TWt0Q0ZSY3dMUkZRQTlXYmVEUk03VzdrYkJkWlRIUTJ4blBneXhaTHExZ0NwSw";

Expand Down Expand Up @@ -238,6 +242,61 @@ async fn create_event() {
.unwrap();
assert!(matches!(resp, EventsPostResponse::Success));
}

#[test(tokio::test)]
async fn create_event_twice() {
let node_id = NodeId::random().0;
let network = Network::Mainnet;
let expected_event_id =
EventId::try_from(hex::decode(UNSIGNED_INIT_EVENT_ID).unwrap()).unwrap();

// Remove whitespace from event CAR file
let event_data = UNSIGNED_INIT_EVENT_CAR
.chars()
.filter(|c| !c.is_whitespace())
.collect::<String>();
let mock_interest = MockAccessInterestStoreTest::new();
let mut mock_event_store = MockEventStoreTest::new();
let item = ApiItem::new(
expected_event_id,
decode_multibase_data(&event_data).unwrap(),
);
let args = vec![item.clone(), item];

mock_event_store
.expect_insert_many()
.with(predicate::eq(args), predicate::eq(node_id))
.times(1)
.returning(|input, _| {
Ok(input
.into_iter()
.map(|v| EventInsertResult::new_ok(v.key.clone()))
.collect())
});
let server = create_test_server(
node_id,
network,
mock_interest,
Arc::new(mock_event_store),
None,
);
let (resp1, resp2) = join!(
server.events_post(
models::EventData {
data: event_data.to_string(),
},
&Context,
),
server.events_post(
models::EventData {
data: event_data.to_string(),
},
&Context,
),
);
assert_eq!(resp1.unwrap(), EventsPostResponse::Success);
assert_eq!(resp2.unwrap(), EventsPostResponse::Success);
}
#[test(tokio::test)]
async fn create_event_fails() {
let node_id = NodeId::random().0;
Expand Down
2 changes: 1 addition & 1 deletion core/src/event_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::network::Network;
const MIN_BYTES: [u8; 0] = [];
const MAX_BYTES: [u8; 1] = [0xFF];

#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Hash, PartialOrd, Ord, Clone, Serialize, Deserialize)]
/// EventId is the event data as a recon key
pub struct EventId(#[serde(with = "serde_bytes")] Vec<u8>);

Expand Down

0 comments on commit 7160752

Please sign in to comment.