Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
Merge pull request #6 from Horusiath/yrs-v0.17-update
Browse files Browse the repository at this point in the history
updated code to match yrs v0.17
  • Loading branch information
Horusiath authored Nov 17, 2023
2 parents 56958e8 + 254d8ad commit c21259b
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 23 deletions.
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ readme = "./README.md"
net = ["dep:tokio", "dep:futures-util"]

[dependencies]
lib0 = ">= 0.16"
yrs = ">= 0.16"
yrs = "0.17"
thiserror = "1.0"
tokio = { version = "1.26.0", features = ["net", "sync"], optional = true }
futures-util = { version = "0.3", features = ["sink"], optional = true }
Expand Down
3 changes: 2 additions & 1 deletion src/awareness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use std::time::Instant;
use thiserror::Error;
use yrs::block::ClientID;
use yrs::encoding::read;
use yrs::updates::decoder::{Decode, Decoder};
use yrs::updates::encoder::{Encode, Encoder};
use yrs::{Doc, Observer, Subscription};
Expand Down Expand Up @@ -297,7 +298,7 @@ impl Encode for AwarenessUpdate {
}

impl Decode for AwarenessUpdate {
fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, lib0::error::Error> {
fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, read::Error> {
let len: usize = decoder.read_var()?;
let mut clients = HashMap::with_capacity(len);
for _ in 0..len {
Expand Down
7 changes: 4 additions & 3 deletions src/net/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
#![allow(dead_code)]
use crate::awareness;
use crate::awareness::{Awareness, Event};
use crate::awareness::Awareness;
use crate::net::conn::handle_msg;
use crate::sync::{DefaultProtocol, Error, Message, Protocol, MSG_SYNC, MSG_SYNC_UPDATE};
use futures_util::{SinkExt, StreamExt};
use lib0::encoding::Write;
use std::sync::Arc;
use tokio::select;
use tokio::sync::broadcast::error::SendError;
use tokio::sync::broadcast::{channel, Receiver, Sender};
use tokio::sync::{Mutex, RwLock};
use tokio::task::JoinHandle;
use yrs::encoding::write::Write;
use yrs::updates::decoder::Decode;
use yrs::updates::encoder::{Encode, Encoder, EncoderV1};
use yrs::UpdateSubscription;
Expand Down Expand Up @@ -238,7 +239,7 @@ mod test {
}

fn test_channel(capacity: usize) -> (PollSender<Vec<u8>>, ReceiverStream<Vec<u8>>) {
let (s, r) = tokio::sync::mpsc::channel::<Vec<u8>>(1);
let (s, r) = tokio::sync::mpsc::channel::<Vec<u8>>(capacity);
let s = PollSender::new(s);
let r = ReceiverStream::new(r);
(s, r)
Expand Down
3 changes: 2 additions & 1 deletion src/net/conn.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#![allow(dead_code)]
use crate::awareness::Awareness;
use crate::sync::{DefaultProtocol, Error, Message, MessageReader, Protocol, SyncMessage};
use futures_util::sink::SinkExt;
use futures_util::StreamExt;
use lib0::decoding::Cursor;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
Expand All @@ -11,6 +11,7 @@ use std::task::{Context, Poll};
use tokio::spawn;
use tokio::sync::{Mutex, RwLock};
use tokio::task::JoinHandle;
use yrs::encoding::read::Cursor;
use yrs::updates::decoder::{Decode, DecoderV1};
use yrs::updates::encoder::{Encode, Encoder, EncoderV1};
use yrs::Update;
Expand Down
31 changes: 15 additions & 16 deletions src/sync.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::awareness;
use crate::awareness::{Awareness, AwarenessUpdate};
use thiserror::Error;
use yrs::encoding::read;
use yrs::updates::decoder::{Decode, Decoder};
use yrs::updates::encoder::{Encode, Encoder};
use yrs::{ReadTxn, StateVector, Transact, Update};
Expand Down Expand Up @@ -90,7 +91,7 @@ pub trait Protocol {
/// send back [Error::PermissionDenied].
fn handle_auth(
&self,
awareness: &Awareness,
_awareness: &Awareness,
deny_reason: Option<String>,
) -> Result<Option<Message>, Error> {
if let Some(reason) = deny_reason {
Expand Down Expand Up @@ -122,9 +123,9 @@ pub trait Protocol {
/// implemented here. By default it returns an [Error::Unsupported].
fn missing_handle(
&self,
awareness: &mut Awareness,
_awareness: &mut Awareness,
tag: u8,
data: Vec<u8>,
_data: Vec<u8>,
) -> Result<Option<Message>, Error> {
Err(Error::Unsupported(tag))
}
Expand Down Expand Up @@ -183,7 +184,7 @@ impl Encode for Message {
}

impl Decode for Message {
fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, lib0::error::Error> {
fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, read::Error> {
let tag: u8 = decoder.read_var()?;
match tag {
MSG_SYNC => {
Expand Down Expand Up @@ -246,7 +247,7 @@ impl Encode for SyncMessage {
}

impl Decode for SyncMessage {
fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, lib0::error::Error> {
fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, read::Error> {
let tag: u8 = decoder.read_var()?;
match tag {
MSG_SYNC_STEP_1 => {
Expand All @@ -262,7 +263,7 @@ impl Decode for SyncMessage {
let buf = decoder.read_buf()?;
Ok(SyncMessage::Update(buf.into()))
}
_ => Err(lib0::error::Error::UnexpectedValue),
_ => Err(read::Error::UnexpectedValue),
}
}
}
Expand All @@ -272,7 +273,7 @@ impl Decode for SyncMessage {
pub enum Error {
/// Incoming Y-protocol message couldn't be deserialized.
#[error("failed to deserialize message: {0}")]
DecodingError(#[from] lib0::error::Error),
DecodingError(#[from] read::Error),

/// Applying incoming Y-protocol awareness update has failed.
#[error("failed to process awareness update: {0}")]
Expand All @@ -286,6 +287,10 @@ pub enum Error {
#[error("unsupported message tag identifier: {0}")]
Unsupported(u8),

/// Thrown in case of I/O errors.
#[error("IO error: {0}")]
IO(#[from] std::io::Error),

/// Custom dynamic kind of error, usually related to a warp internal error messages.
#[error("internal failure: {0}")]
Other(#[from] Box<dyn std::error::Error + Send + Sync>),
Expand All @@ -298,12 +303,6 @@ impl From<tokio::task::JoinError> for Error {
}
}

impl From<std::io::Error> for Error {
fn from(value: std::io::Error) -> Self {
Error::DecodingError(lib0::error::Error::IO(value))
}
}

/// Since y-sync protocol enables for a multiple messages to be packed into a singe byte payload,
/// [MessageReader] can be used over the decoder to read these messages one by one in iterable
/// fashion.
Expand All @@ -316,12 +315,12 @@ impl<'a, D: Decoder> MessageReader<'a, D> {
}

impl<'a, D: Decoder> Iterator for MessageReader<'a, D> {
type Item = Result<Message, lib0::error::Error>;
type Item = Result<Message, read::Error>;

fn next(&mut self) -> Option<Self::Item> {
match Message::decode(self.0) {
Ok(msg) => Some(Ok(msg)),
Err(lib0::error::Error::EndOfBuffer(_)) => None,
Err(read::Error::EndOfBuffer(_)) => None,
Err(error) => Some(Err(error)),
}
}
Expand All @@ -331,8 +330,8 @@ impl<'a, D: Decoder> Iterator for MessageReader<'a, D> {
mod test {
use crate::awareness::Awareness;
use crate::sync::*;
use lib0::decoding::Cursor;
use std::collections::HashMap;
use yrs::encoding::read::Cursor;
use yrs::updates::decoder::{Decode, DecoderV1};
use yrs::updates::encoder::{Encode, EncoderV1};
use yrs::{Doc, GetString, ReadTxn, StateVector, Text, Transact};
Expand Down

0 comments on commit c21259b

Please sign in to comment.