Skip to content

Commit

Permalink
Serialization refactor: WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Lorak-mmk committed Nov 2, 2023
1 parent 25677ba commit d8a7168
Show file tree
Hide file tree
Showing 17 changed files with 531 additions and 246 deletions.
6 changes: 3 additions & 3 deletions scylla-cql/benches/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use std::borrow::Cow;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};

use scylla_cql::frame::request::SerializableRequest;
use scylla_cql::frame::value::SerializedValues;
use scylla_cql::frame::value::ValueList;
use scylla_cql::frame::{request::query, Compression, SerializedRequest};
use scylla_cql::types::serialize::row::NewSerializedValues;

fn make_query<'a>(contents: &'a str, values: &'a SerializedValues) -> query::Query<'a> {
fn make_query<'a>(contents: &'a str, values: &'a NewSerializedValues) -> query::Query<'a> {
query::Query {
contents: Cow::Borrowed(contents),
parameters: query::QueryParameters {
Expand All @@ -31,7 +31,7 @@ fn serialized_request_make_bench(c: &mut Criterion) {
&(1234, "a value", "i am storing a string", "dc0c8cd7-d954-47c1-8722-a857941c43fb").serialized().unwrap()
),
];
let queries = query_args.map(|(q, v)| make_query(q, v));
let queries = query_args.map(|(q, v)| make_query(q, todo!()));

for query in queries {
let query_size = query.to_bytes().unwrap().len();
Expand Down
10 changes: 10 additions & 0 deletions scylla-cql/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use crate::frame::frame_errors::{FrameError, ParseError};
use crate::frame::protocol_features::ProtocolFeatures;
use crate::frame::value::SerializeValuesError;
use crate::types::serialize::SerializationError;
use crate::Consistency;
use bytes::Bytes;
use std::io::ErrorKind;
Expand Down Expand Up @@ -340,6 +341,9 @@ pub enum BadQuery {
#[error("Serializing values failed: {0} ")]
SerializeValuesError(#[from] SerializeValuesError),

#[error("Serializing values failed: {0} ")]
SerializationError(#[from] SerializationError),

/// Serialized values are too long to compute partition key
#[error("Serialized values are too long to compute partition key! Length: {0}, Max allowed length: {1}")]
ValuesTooLongForKey(usize, usize),
Expand Down Expand Up @@ -443,6 +447,12 @@ impl From<SerializeValuesError> for QueryError {
}
}

impl From<SerializationError> for QueryError {
fn from(serialized_err: SerializationError) -> QueryError {
QueryError::BadQuery(BadQuery::SerializationError(serialized_err))
}
}

impl From<ParseError> for QueryError {
fn from(parse_error: ParseError) -> QueryError {
QueryError::InvalidMessage(format!("Error parsing message: {}", parse_error))
Expand Down
3 changes: 3 additions & 0 deletions scylla-cql/src/frame/frame_errors.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::response;
use crate::cql_to_rust::CqlTypeError;
use crate::frame::value::SerializeValuesError;
use crate::types::serialize::SerializationError;
use thiserror::Error;

#[derive(Error, Debug)]
Expand Down Expand Up @@ -44,5 +45,7 @@ pub enum ParseError {
#[error(transparent)]
SerializeValuesError(#[from] SerializeValuesError),
#[error(transparent)]
SerializationError(#[from] SerializationError),
#[error(transparent)]
CqlTypeError(#[from] CqlTypeError),
}
36 changes: 26 additions & 10 deletions scylla-cql/src/frame/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,10 @@ mod tests {
query::{Query, QueryParameters},
DeserializableRequest, SerializableRequest,
},
response::result::ColumnType,
types::{self, SerialConsistency},
value::SerializedValues,
},
types::serialize::row::NewSerializedValues,
Consistency,
};

Expand All @@ -129,8 +130,8 @@ mod tests {
page_size: Some(323),
paging_state: Some(vec![2, 1, 3, 7].into()),
values: {
let mut vals = SerializedValues::new();
vals.add_value(&2137).unwrap();
let mut vals = NewSerializedValues::new();
vals.add_value(&2137, &ColumnType::Int).unwrap();
Cow::Owned(vals)
},
};
Expand All @@ -156,9 +157,9 @@ mod tests {
page_size: None,
paging_state: None,
values: {
let mut vals = SerializedValues::new();
vals.add_named_value("the_answer", &42).unwrap();
vals.add_named_value("really?", &2137).unwrap();
let mut vals = NewSerializedValues::new();
vals.add_value(&42, &ColumnType::Int).unwrap();
vals.add_value(&2137, &ColumnType::Int).unwrap();
Cow::Owned(vals)
},
};
Expand Down Expand Up @@ -189,8 +190,18 @@ mod tests {

// Not execute's values, because named values are not supported in batches.
values: vec![
query.parameters.values.deref().clone(),
query.parameters.values.deref().clone(),
query
.parameters
.values
.deref()
.clone()
.into_old_serialized_values(),
query
.parameters
.values
.deref()
.clone()
.into_old_serialized_values(),
],
};
{
Expand All @@ -212,7 +223,7 @@ mod tests {
timestamp: None,
page_size: None,
paging_state: None,
values: Cow::Owned(SerializedValues::new()),
values: Cow::Owned(NewSerializedValues::new()),
};
let query = Query {
contents: contents.clone(),
Expand Down Expand Up @@ -261,7 +272,12 @@ mod tests {
serial_consistency: None,
timestamp: None,

values: vec![query.parameters.values.deref().clone()],
values: vec![query
.parameters
.values
.deref()
.clone()
.into_old_serialized_values()],
};
{
let mut buf = Vec::new();
Expand Down
24 changes: 14 additions & 10 deletions scylla-cql/src/frame/request/query.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::borrow::Cow;

use crate::frame::{frame_errors::ParseError, types::SerialConsistency};
use crate::{
frame::{frame_errors::ParseError, types::SerialConsistency},
types::serialize::row::NewSerializedValues,
};
use bytes::{Buf, BufMut, Bytes};

use crate::{
frame::request::{RequestOpcode, SerializableRequest},
frame::types,
frame::value::SerializedValues,
};

use super::DeserializableRequest;
Expand Down Expand Up @@ -61,7 +63,7 @@ pub struct QueryParameters<'a> {
pub timestamp: Option<i64>,
pub page_size: Option<i32>,
pub paging_state: Option<Bytes>,
pub values: Cow<'a, SerializedValues>,
pub values: Cow<'a, NewSerializedValues>,
}

impl Default for QueryParameters<'_> {
Expand All @@ -72,7 +74,7 @@ impl Default for QueryParameters<'_> {
timestamp: None,
page_size: None,
paging_state: None,
values: Cow::Borrowed(SerializedValues::EMPTY),
values: Cow::Owned(NewSerializedValues::new()),
}
}
}
Expand Down Expand Up @@ -102,10 +104,6 @@ impl QueryParameters<'_> {
flags |= FLAG_WITH_DEFAULT_TIMESTAMP;
}

if self.values.has_names() {
flags |= FLAG_WITH_NAMES_FOR_VALUES;
}

buf.put_u8(flags);

if !self.values.is_empty() {
Expand Down Expand Up @@ -151,10 +149,16 @@ impl<'q> QueryParameters<'q> {
let default_timestamp_flag = (flags & FLAG_WITH_DEFAULT_TIMESTAMP) != 0;
let values_have_names_flag = (flags & FLAG_WITH_NAMES_FOR_VALUES) != 0;

if values_have_names_flag {
return Err(ParseError::BadIncomingData(
"Named values in frame are currently unsupported".to_string(),
));
}

let values = Cow::Owned(if values_flag {
SerializedValues::new_from_frame(buf, values_have_names_flag)?
NewSerializedValues::new_from_frame(buf)?
} else {
SerializedValues::new()
NewSerializedValues::new()
});

let page_size = page_size_flag.then(|| types::read_int(buf)).transpose()?;
Expand Down
4 changes: 2 additions & 2 deletions scylla-cql/src/types/serialize/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{any::Any, sync::Arc};
use std::{error::Error, sync::Arc};

pub mod row;
pub mod value;

type SerializationError = Arc<dyn Any + Send + Sync>;
pub type SerializationError = Arc<dyn Error + Send + Sync>;

/// An interface that facilitates writing values for a CQL query.
pub trait RowWriter {
Expand Down
Loading

0 comments on commit d8a7168

Please sign in to comment.