Skip to content

Commit

Permalink
feat: add sync_outgoing
Browse files Browse the repository at this point in the history
  • Loading branch information
hydra-yse committed Nov 18, 2024
1 parent 3e95eff commit 8fce375
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 1 deletion.
1 change: 1 addition & 0 deletions lib/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ x509-parser = { version = "0.16.0" }
tempfile = "3"
tonic = { version = "0.12.3", features = ["tls"] }
prost = "0.13.3"
uuid = { version = "1.8.0", features = ["v4"] }

[dev-dependencies]
lazy_static = "1.5.0"
Expand Down
157 changes: 156 additions & 1 deletion lib/core/src/persist/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use anyhow::Result;
use rusqlite::{named_params, Connection, OptionalExtension, Row, Statement, TransactionBehavior};

use super::Persister;
use crate::sync::model::{sync::Record, SyncOutgoingDetails, SyncSettings, SyncState};
use crate::{
sync::model::{sync::Record, RecordType, SyncOutgoingDetails, SyncSettings, SyncState},
utils,
};

impl Persister {
fn select_sync_state_query(where_clauses: Vec<String>) -> String {
Expand Down Expand Up @@ -187,4 +190,156 @@ impl Persister {
Ok(())
}

pub(crate) fn commit_new_outgoing(
con: &Connection,
data_id: &str,
record_type: RecordType,
) -> Result<()> {
let record_id = uuid::Uuid::new_v4().to_string();

con.execute(
"
INSERT INTO sync_outgoing(record_id, record_type, commit_time, updated_fields_json)
VALUES(:record_id, :record_type, :commit_time, :updated_fields_json)
",
named_params! {
":record_id": &record_id,
":record_type": record_type,
":commit_time": utils::now(),
":updated_fields_json": Option::<String>::None,
},
)?;

Self::set_sync_state_stmt(con)?.execute(named_params! {
":data_id": data_id,
":record_id": record_id,
":record_revision": 0,
":is_local": true,
})?;

Ok(())
}

pub(crate) fn commit_updated_fields(
con: &Connection,
data_id: &str,
record_type: RecordType,
updated_fields: Vec<String>,
) -> Result<()> {
let query = Self::select_sync_state_query(vec!["data_id = ?1".to_string()]);
let Some(sync_state) = con
.query_row(&query, [data_id], Self::sql_row_to_sync_state)
.optional()?
else {
return Err(anyhow::anyhow!("Cannot execute commit_updated_fields without a matching swap_state. For new records, use commit_new_outgoing"));
};

con.execute("
INSERT OR REPLACE INTO sync_outgoing(record_id, record_type, commit_time, updated_fields_json)
VALUES(
:record_id,
:record_type,
:commit_time,
json_insert(
COALESCE((SELECT updated_fields_json FROM sync_outgoing WHERE record_id = :record_id), '[]'),
'$[#]',
:updated_fields_json
)
)
",
named_params! {
":record_id": &sync_state.record_id,
":record_type": record_type,
":commit_time": utils::now(),
":updated_fields_json": serde_json::to_string(&updated_fields)?,
},
)?;

Ok(())
}

fn select_sync_outgoing_details_query(where_clauses: Vec<String>) -> String {
let mut where_clause_str = String::new();
if !where_clauses.is_empty() {
where_clause_str = String::from("WHERE ");
where_clause_str.push_str(where_clauses.join(" AND ").as_str());
}

format!(
"
SELECT
record_id,
record_type,
commit_time,
updated_fields_json
FROM sync_outgoing
{where_clause_str}
"
)
}

fn sql_row_to_sync_outgoing_details(row: &Row) -> Result<SyncOutgoingDetails> {
let record_id = row.get(0)?;
let record_type = row.get(1)?;
let commit_time = row.get(2)?;
let updated_fields = match row.get::<_, Option<String>>(3)? {
Some(fields) => Some(serde_json::from_str(&fields)?),
None => None,
};

Ok(SyncOutgoingDetails {
record_id,
record_type,
commit_time,
updated_fields,
})
}

pub(crate) fn get_sync_outgoing_details(&self) -> Result<Vec<SyncOutgoingDetails>> {
let con = self.get_connection()?;

let query = Self::select_sync_outgoing_details_query(vec![]);
let mut stmt = con.prepare(&query)?;
let mut rows = stmt.query([])?;

let mut outgoing_details = vec![];
while let Some(row) = rows.next()? {
let detail = Self::sql_row_to_sync_outgoing_details(row)?;
outgoing_details.push(detail);
}

Ok(outgoing_details)
}

pub(crate) fn get_sync_outgoing_details_by_id(
&self,
record_id: &str,
) -> Result<Option<SyncOutgoingDetails>> {
let con = self.get_connection()?;
let query =
Self::select_sync_outgoing_details_query(vec!["record_id = :record_id".to_string()]);
let mut stmt = con.prepare(&query)?;
let mut rows = stmt.query(named_params! {
":record_id": record_id,
})?;

if let Some(row) = rows.next()? {
return Ok(Some(Self::sql_row_to_sync_outgoing_details(row)?));
}

Ok(None)
}

pub(crate) fn remove_sync_outgoing_details(&self, record_id: &str) -> Result<()> {
let con = self.get_connection()?;

con.execute(
"DELETE FROM sync_outgoing WHERE record_id = :record_id",
named_params! {
":record_id": record_id
},
)?;

Ok(())
}
}
32 changes: 32 additions & 0 deletions lib/core/src/sync/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,32 @@ use rusqlite::{

pub(crate) mod sync;

#[derive(Copy, Clone)]
pub(crate) enum RecordType {
Receive = 0,
Send = 1,
Chain = 2,
}

impl ToSql for RecordType {
fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
Ok(rusqlite::types::ToSqlOutput::from(*self as i8))
}
}

impl FromSql for RecordType {
fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
match value {
ValueRef::Integer(i) => match i as u8 {
0 => Ok(Self::Receive),
1 => Ok(Self::Send),
2 => Ok(Self::Chain),
_ => Err(FromSqlError::OutOfRange(i)),
},
_ => Err(FromSqlError::InvalidType),
}
}
}

pub(crate) struct SyncState {
pub(crate) data_id: String,
Expand All @@ -18,3 +44,9 @@ pub(crate) struct SyncSettings {
pub(crate) latest_revision: Option<u64>,
}

pub(crate) struct SyncOutgoingDetails {
pub(crate) record_id: String,
pub(crate) record_type: RecordType,
pub(crate) commit_time: u32,
pub(crate) updated_fields: Option<Vec<String>>,
}

0 comments on commit 8fce375

Please sign in to comment.