Skip to content

Commit

Permalink
feat: add last_derivation_index to sync service
Browse files Browse the repository at this point in the history
  • Loading branch information
hydra-yse committed Nov 24, 2024
1 parent 46d6f06 commit 449b37f
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 10 deletions.
26 changes: 22 additions & 4 deletions lib/core/src/persist/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ use anyhow::Result;
use rusqlite::{Transaction, TransactionBehavior};
use std::str::FromStr;

use crate::sync::model::{data::LAST_DERIVATION_INDEX_DATA_ID, RecordType};

use super::Persister;

const KEY_SWAPPER_PROXY_URL: &str = "swapper_proxy_url";
const KEY_IS_FIRST_SYNC_COMPLETE: &str = "is_first_sync_complete";
const KEY_WEBHOOK_URL: &str = "webhook_url";
// TODO: The `last_derivation_index` needs to be synced
const KEY_LAST_DERIVATION_INDEX: &str = "last_derivation_index";
pub(crate) const KEY_LAST_DERIVATION_INDEX: &str = "last_derivation_index";

impl Persister {
fn get_cached_item_inner(tx: &Transaction, key: &str) -> Result<Option<String>> {
Expand All @@ -20,7 +21,11 @@ impl Persister {
Ok(res.ok())
}

fn update_cached_item_inner(tx: &Transaction, key: &str, value: String) -> Result<()> {
pub(crate) fn update_cached_item_inner(
tx: &Transaction,
key: &str,
value: String,
) -> Result<()> {
tx.execute(
"INSERT OR REPLACE INTO cached_items (key, value) VALUES (?1,?2)",
(key, value),
Expand Down Expand Up @@ -92,7 +97,20 @@ impl Persister {
}

pub fn set_last_derivation_index(&self, index: u32) -> Result<()> {
self.update_cached_item(KEY_LAST_DERIVATION_INDEX, index.to_string())
let mut con = self.get_connection()?;
let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;

Self::update_cached_item_inner(&tx, KEY_LAST_DERIVATION_INDEX, index.to_string())?;
Self::commit_outgoing(
&tx,
LAST_DERIVATION_INDEX_DATA_ID,
RecordType::LastDerivationIndex,
// insert a mock updated field so that merging with incoming data works as expected
Some(vec!["last-derivation-index".to_string()]),
)?;

tx.commit()?;
Ok(())
}

pub fn get_last_derivation_index(&self) -> Result<Option<u32>> {
Expand Down
2 changes: 1 addition & 1 deletion lib/core/src/persist/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod address;
mod backup;
mod cache;
pub(crate) mod cache;
pub(crate) mod chain;
mod migrations;
pub(crate) mod receive;
Expand Down
42 changes: 40 additions & 2 deletions lib/core/src/persist/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use std::collections::HashMap;
use anyhow::Result;
use rusqlite::{named_params, Connection, OptionalExtension, Row, Statement, TransactionBehavior};

use super::{PaymentState, Persister};
use super::{cache::KEY_LAST_DERIVATION_INDEX, PaymentState, Persister};
use crate::{
sync::model::{
data::{ChainSyncData, ReceiveSyncData, SendSyncData},
data::{ChainSyncData, ReceiveSyncData, SendSyncData, LAST_DERIVATION_INDEX_DATA_ID},
sync::Record,
RecordType, SyncOutgoingChanges, SyncSettings, SyncState,
},
Expand Down Expand Up @@ -688,4 +688,42 @@ impl Persister {

Ok(())
}

pub(crate) fn commit_incoming_address_index(
&self,
new_address_index: u32,
sync_state: SyncState,
last_commit_time: Option<u32>,
) -> Result<()> {
let mut con = self.get_connection()?;
let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?;

if let Some(last_commit_time) = last_commit_time {
Self::check_commit_update(
&tx,
&Record::get_id_from_record_type(
RecordType::LastDerivationIndex,
LAST_DERIVATION_INDEX_DATA_ID,
),
last_commit_time,
)?;
}

Self::update_cached_item_inner(
&tx,
KEY_LAST_DERIVATION_INDEX,
new_address_index.to_string(),
)?;

Self::set_sync_state_stmt(&tx)?.execute(named_params! {
":data_id": &sync_state.data_id,
":record_id": &sync_state.record_id,
":record_revision": &sync_state.record_revision,
":is_local": &sync_state.is_local,
})?;

tx.commit()?;

Ok(())
}
}
108 changes: 105 additions & 3 deletions lib/core/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use tokio::sync::watch;

use crate::sync::model::sync::{Record, SetRecordRequest, SetRecordStatus};
use crate::utils;
use crate::{persist::Persister, prelude::Signer};
use crate::{
persist::{cache::KEY_LAST_DERIVATION_INDEX, Persister},
prelude::Signer,
};

use self::client::SyncerClient;
use self::model::{
Expand Down Expand Up @@ -110,6 +113,9 @@ impl SyncService {
is_update,
last_commit_time,
)?,
SyncData::LastDerivationIndex(new_address_index) => self
.persister
.commit_incoming_address_index(new_address_index, sync_state, last_commit_time)?,
}
Ok(())
}
Expand Down Expand Up @@ -140,6 +146,12 @@ impl SyncService {
.into();
SyncData::Chain(chain_data)
}
RecordType::LastDerivationIndex => SyncData::LastDerivationIndex(
self.persister
.get_cached_item(KEY_LAST_DERIVATION_INDEX)?
.ok_or(anyhow!("Could not find last derivation index"))?
.parse()?,
),
};
Ok(data)
}
Expand Down Expand Up @@ -314,9 +326,9 @@ mod tests {
use tokio::sync::{mpsc, Mutex};

use crate::{
persist::Persister,
persist::{cache::KEY_LAST_DERIVATION_INDEX, Persister},
prelude::{Direction, PaymentState, Signer},
sync::model::SyncState,
sync::model::{data::LAST_DERIVATION_INDEX_DATA_ID, SyncState},
test_utils::{
chain_swap::new_chain_swap,
persist::{new_persister, new_receive_swap, new_send_swap},
Expand Down Expand Up @@ -630,4 +642,94 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_last_derivation_index_update() -> Result<()> {
let (_temp_dir, persister) = new_persister()?;
let persister = Arc::new(persister);

let signer: Arc<Box<dyn Signer>> = Arc::new(Box::new(MockSigner::new()));

let (incoming_tx, incoming_rx) = mpsc::channel::<Record>(10);
let outgoing_records = Arc::new(Mutex::new(HashMap::new()));
let client = Box::new(MockSyncerClient::new(incoming_rx, outgoing_records.clone()));
let sync_service =
SyncService::new("".to_string(), persister.clone(), signer.clone(), client);

// Check pull
assert_eq!(persister.get_cached_item(KEY_LAST_DERIVATION_INDEX)?, None);

let new_last_derivation_index = 10;
let data = SyncData::LastDerivationIndex(new_last_derivation_index);
incoming_tx
.send(Record::new(data, 0, signer.clone())?)
.await?;

sync_service.pull().await?;

assert_eq!(
persister.get_cached_item(KEY_LAST_DERIVATION_INDEX)?,
Some(new_last_derivation_index.to_string())
);

// Check push
let new_last_derivation_index = 20;
persister.set_last_derivation_index(new_last_derivation_index)?;

sync_service.push().await?;

let outgoing = outgoing_records.lock().await;
let record = get_outgoing_record(
persister.clone(),
&outgoing,
LAST_DERIVATION_INDEX_DATA_ID,
RecordType::LastDerivationIndex,
)?;
let decrypted_record = record.clone().decrypt(signer.clone())?;
match decrypted_record.data {
SyncData::LastDerivationIndex(last_derivation_index) => {
assert_eq!(last_derivation_index, new_last_derivation_index);
}
_ => {
return Err(anyhow::anyhow!("Unexpected sync data type received."));
}
}

// Check pull with merge
let new_local_last_derivation_index = 30;
persister.set_last_derivation_index(new_local_last_derivation_index)?;

let new_remote_last_derivation_index = 25;
let data = SyncData::LastDerivationIndex(new_remote_last_derivation_index);
incoming_tx
.send(Record::new(data, 0, signer.clone())?)
.await?;

sync_service.pull().await?;

// Newer one is persisted (local > remote)
assert_eq!(
persister.get_cached_item(KEY_LAST_DERIVATION_INDEX)?,
Some(new_local_last_derivation_index.to_string())
);

let new_local_last_derivation_index = 35;
persister.set_last_derivation_index(new_local_last_derivation_index)?;

let new_remote_last_derivation_index = 40;
let data = SyncData::LastDerivationIndex(new_remote_last_derivation_index);
incoming_tx
.send(Record::new(data, 2, signer.clone())?)
.await?;

sync_service.pull().await?;

// Newer one is persisted (remote > local)
assert_eq!(
persister.get_cached_item(KEY_LAST_DERIVATION_INDEX)?,
Some(new_remote_last_derivation_index.to_string())
);

Ok(())
}
}
10 changes: 10 additions & 0 deletions lib/core/src/sync/model/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use serde::{Deserialize, Serialize};

use crate::prelude::{ChainSwap, Direction, ReceiveSwap, SendSwap};

pub(crate) const LAST_DERIVATION_INDEX_DATA_ID: &str = "last-derivation-index";

#[derive(Serialize, Deserialize, Clone, Debug)]
pub(crate) struct ChainSyncData {
pub(crate) swap_id: String,
Expand Down Expand Up @@ -155,6 +157,7 @@ pub(crate) enum SyncData {
Chain(ChainSyncData),
Send(SendSyncData),
Receive(ReceiveSyncData),
LastDerivationIndex(u32),
}

impl SyncData {
Expand All @@ -163,6 +166,7 @@ impl SyncData {
SyncData::Chain(chain_data) => &chain_data.swap_id,
SyncData::Send(send_data) => &send_data.swap_id,
SyncData::Receive(receive_data) => &receive_data.swap_id,
SyncData::LastDerivationIndex(_) => LAST_DERIVATION_INDEX_DATA_ID,
}
}

Expand All @@ -181,6 +185,12 @@ impl SyncData {
(SyncData::Receive(ref mut base), SyncData::Receive(other)) => {
base.merge(other, updated_fields)
}
(
SyncData::LastDerivationIndex(our_index),
SyncData::LastDerivationIndex(their_index),
) => {
*our_index = std::cmp::max(*their_index, *our_index);
}
_ => return Err(anyhow::anyhow!("Cannot merge data from two separate types")),
};
Ok(())
Expand Down
4 changes: 4 additions & 0 deletions lib/core/src/sync/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub(crate) enum RecordType {
Receive = 0,
Send = 1,
Chain = 2,
LastDerivationIndex = 3,
}

impl ToSql for RecordType {
Expand All @@ -41,6 +42,7 @@ impl FromSql for RecordType {
0 => Ok(Self::Receive),
1 => Ok(Self::Send),
2 => Ok(Self::Chain),
3 => Ok(Self::LastDerivationIndex),
_ => Err(FromSqlError::OutOfRange(i)),
},
_ => Err(FromSqlError::InvalidType),
Expand Down Expand Up @@ -105,6 +107,7 @@ impl Record {
SyncData::Chain(_) => "chain-swap",
SyncData::Send(_) => "send-swap",
SyncData::Receive(_) => "receive-swap",
SyncData::LastDerivationIndex(_) => "wallet-address",
}
.to_string();
Self::id(prefix, data.id())
Expand All @@ -115,6 +118,7 @@ impl Record {
RecordType::Chain => "chain-swap",
RecordType::Send => "send-swap",
RecordType::Receive => "receive-swap",
RecordType::LastDerivationIndex => "wallet-address",
}
.to_string();
Self::id(prefix, data_id)
Expand Down

0 comments on commit 449b37f

Please sign in to comment.