diff --git a/Cargo.toml b/Cargo.toml index 23a79f5..42d26d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,8 +14,7 @@ readme = "./README.md" net = ["dep:tokio", "dep:futures-util"] [dependencies] -lib0 = ">= 0.16" -yrs = ">= 0.16" +yrs = "0.17" thiserror = "1.0" tokio = { version = "1.26.0", features = ["net", "sync"], optional = true } futures-util = { version = "0.3", features = ["sink"], optional = true } diff --git a/src/awareness.rs b/src/awareness.rs index fae3535..c05432d 100644 --- a/src/awareness.rs +++ b/src/awareness.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use std::time::Instant; use thiserror::Error; use yrs::block::ClientID; +use yrs::encoding::read; use yrs::updates::decoder::{Decode, Decoder}; use yrs::updates::encoder::{Encode, Encoder}; use yrs::{Doc, Observer, Subscription}; @@ -297,7 +298,7 @@ impl Encode for AwarenessUpdate { } impl Decode for AwarenessUpdate { - fn decode(decoder: &mut D) -> Result { + fn decode(decoder: &mut D) -> Result { let len: usize = decoder.read_var()?; let mut clients = HashMap::with_capacity(len); for _ in 0..len { diff --git a/src/net/broadcast.rs b/src/net/broadcast.rs index d4c475b..27a871f 100644 --- a/src/net/broadcast.rs +++ b/src/net/broadcast.rs @@ -1,15 +1,16 @@ +#![allow(dead_code)] use crate::awareness; -use crate::awareness::{Awareness, Event}; +use crate::awareness::Awareness; use crate::net::conn::handle_msg; use crate::sync::{DefaultProtocol, Error, Message, Protocol, MSG_SYNC, MSG_SYNC_UPDATE}; use futures_util::{SinkExt, StreamExt}; -use lib0::encoding::Write; use std::sync::Arc; use tokio::select; use tokio::sync::broadcast::error::SendError; use tokio::sync::broadcast::{channel, Receiver, Sender}; use tokio::sync::{Mutex, RwLock}; use tokio::task::JoinHandle; +use yrs::encoding::write::Write; use yrs::updates::decoder::Decode; use yrs::updates::encoder::{Encode, Encoder, EncoderV1}; use yrs::UpdateSubscription; @@ -238,7 +239,7 @@ mod test { } fn test_channel(capacity: usize) -> (PollSender>, ReceiverStream>) { - let (s, r) = tokio::sync::mpsc::channel::>(1); + let (s, r) = tokio::sync::mpsc::channel::>(capacity); let s = PollSender::new(s); let r = ReceiverStream::new(r); (s, r) diff --git a/src/net/conn.rs b/src/net/conn.rs index cc73a7a..e58aedd 100644 --- a/src/net/conn.rs +++ b/src/net/conn.rs @@ -1,8 +1,8 @@ +#![allow(dead_code)] use crate::awareness::Awareness; use crate::sync::{DefaultProtocol, Error, Message, MessageReader, Protocol, SyncMessage}; use futures_util::sink::SinkExt; use futures_util::StreamExt; -use lib0::decoding::Cursor; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; @@ -11,6 +11,7 @@ use std::task::{Context, Poll}; use tokio::spawn; use tokio::sync::{Mutex, RwLock}; use tokio::task::JoinHandle; +use yrs::encoding::read::Cursor; use yrs::updates::decoder::{Decode, DecoderV1}; use yrs::updates::encoder::{Encode, Encoder, EncoderV1}; use yrs::Update; diff --git a/src/sync.rs b/src/sync.rs index a67e4c3..dae24fa 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -1,6 +1,7 @@ use crate::awareness; use crate::awareness::{Awareness, AwarenessUpdate}; use thiserror::Error; +use yrs::encoding::read; use yrs::updates::decoder::{Decode, Decoder}; use yrs::updates::encoder::{Encode, Encoder}; use yrs::{ReadTxn, StateVector, Transact, Update}; @@ -90,7 +91,7 @@ pub trait Protocol { /// send back [Error::PermissionDenied]. fn handle_auth( &self, - awareness: &Awareness, + _awareness: &Awareness, deny_reason: Option, ) -> Result, Error> { if let Some(reason) = deny_reason { @@ -122,9 +123,9 @@ pub trait Protocol { /// implemented here. By default it returns an [Error::Unsupported]. fn missing_handle( &self, - awareness: &mut Awareness, + _awareness: &mut Awareness, tag: u8, - data: Vec, + _data: Vec, ) -> Result, Error> { Err(Error::Unsupported(tag)) } @@ -183,7 +184,7 @@ impl Encode for Message { } impl Decode for Message { - fn decode(decoder: &mut D) -> Result { + fn decode(decoder: &mut D) -> Result { let tag: u8 = decoder.read_var()?; match tag { MSG_SYNC => { @@ -246,7 +247,7 @@ impl Encode for SyncMessage { } impl Decode for SyncMessage { - fn decode(decoder: &mut D) -> Result { + fn decode(decoder: &mut D) -> Result { let tag: u8 = decoder.read_var()?; match tag { MSG_SYNC_STEP_1 => { @@ -262,7 +263,7 @@ impl Decode for SyncMessage { let buf = decoder.read_buf()?; Ok(SyncMessage::Update(buf.into())) } - _ => Err(lib0::error::Error::UnexpectedValue), + _ => Err(read::Error::UnexpectedValue), } } } @@ -272,7 +273,7 @@ impl Decode for SyncMessage { pub enum Error { /// Incoming Y-protocol message couldn't be deserialized. #[error("failed to deserialize message: {0}")] - DecodingError(#[from] lib0::error::Error), + DecodingError(#[from] read::Error), /// Applying incoming Y-protocol awareness update has failed. #[error("failed to process awareness update: {0}")] @@ -286,6 +287,10 @@ pub enum Error { #[error("unsupported message tag identifier: {0}")] Unsupported(u8), + /// Thrown in case of I/O errors. + #[error("IO error: {0}")] + IO(#[from] std::io::Error), + /// Custom dynamic kind of error, usually related to a warp internal error messages. #[error("internal failure: {0}")] Other(#[from] Box), @@ -298,12 +303,6 @@ impl From for Error { } } -impl From for Error { - fn from(value: std::io::Error) -> Self { - Error::DecodingError(lib0::error::Error::IO(value)) - } -} - /// Since y-sync protocol enables for a multiple messages to be packed into a singe byte payload, /// [MessageReader] can be used over the decoder to read these messages one by one in iterable /// fashion. @@ -316,12 +315,12 @@ impl<'a, D: Decoder> MessageReader<'a, D> { } impl<'a, D: Decoder> Iterator for MessageReader<'a, D> { - type Item = Result; + type Item = Result; fn next(&mut self) -> Option { match Message::decode(self.0) { Ok(msg) => Some(Ok(msg)), - Err(lib0::error::Error::EndOfBuffer(_)) => None, + Err(read::Error::EndOfBuffer(_)) => None, Err(error) => Some(Err(error)), } } @@ -331,8 +330,8 @@ impl<'a, D: Decoder> Iterator for MessageReader<'a, D> { mod test { use crate::awareness::Awareness; use crate::sync::*; - use lib0::decoding::Cursor; use std::collections::HashMap; + use yrs::encoding::read::Cursor; use yrs::updates::decoder::{Decode, DecoderV1}; use yrs::updates::encoder::{Encode, EncoderV1}; use yrs::{Doc, GetString, ReadTxn, StateVector, Text, Transact};