Skip to content

Commit

Permalink
Merge pull request scylladb#1080 from muzarski/protocol-error-refactor
Browse files Browse the repository at this point in the history
errors: protocol error refactor
  • Loading branch information
wprzytula authored Oct 4, 2024
2 parents ea7c464 + 204f911 commit 2ac20a9
Show file tree
Hide file tree
Showing 10 changed files with 439 additions and 152 deletions.
4 changes: 2 additions & 2 deletions scylla/src/statement/prepared_statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::frame::types::{Consistency, SerialConsistency};
use crate::history::HistoryListener;
use crate::retry_policy::RetryPolicy;
use crate::routing::Token;
use crate::transport::errors::{BadQuery, QueryError};
use crate::transport::errors::{BadQuery, ProtocolError, QueryError};
use crate::transport::execution_profile::ExecutionProfileHandle;
use crate::transport::partitioner::{Partitioner, PartitionerHasher, PartitionerName};

Expand Down Expand Up @@ -241,7 +241,7 @@ impl PreparedStatement {
self.extract_partition_key(serialized_values)
.map_err(|err| match err {
PartitionKeyExtractionError::NoPkIndexValue(_, _) => {
QueryError::ProtocolError("No pk indexes - can't calculate token")
ProtocolError::PartitionKeyExtraction
}
})?;
let token = partition_key
Expand Down
73 changes: 42 additions & 31 deletions scylla/src/transport/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ use std::{
net::{Ipv4Addr, Ipv6Addr},
};

use super::errors::{ProtocolError, UseKeyspaceProtocolError};
use super::iterator::RowIterator;
use super::locator::tablets::{RawTablet, TabletParsingError};
use super::query_result::SingleRowTypedError;
use super::session::AddressTranslator;
use super::topology::{PeerEndpoint, UntranslatedEndpoint, UntranslatedPeer};
use super::NodeAddr;
Expand Down Expand Up @@ -299,10 +299,11 @@ impl NonErrorQueryResponse {
let (result, paging_state) = self.into_query_result_and_paging_state()?;

if !paging_state.finished() {
let error_msg = "Internal driver API misuse or a server bug: nonfinished paging state\
would be discarded by `NonErrorQueryResponse::into_query_result`";
error!(error_msg);
return Err(QueryError::ProtocolError(error_msg));
error!(
"Internal driver API misuse or a server bug: nonfinished paging state\
would be discarded by `NonErrorQueryResponse::into_query_result`"
);
return Err(ProtocolError::NonfinishedPagingState.into());
}

Ok(result)
Expand Down Expand Up @@ -904,7 +905,11 @@ impl Connection {
// Reprepared statement should keep its id - it's the md5 sum
// of statement contents
if reprepared.get_id() != previous_prepared.get_id() {
Err(UserRequestError::RepreparedIdChanged)
Err(UserRequestError::RepreparedIdChanged {
statement: reprepare_query.contents,
expected_id: previous_prepared.get_id().clone().into(),
reprepared_id: reprepared.get_id().clone().into(),
})
} else {
Ok(())
}
Expand Down Expand Up @@ -1284,17 +1289,16 @@ impl Connection {
self.reprepare(p.get_statement(), p).await?;
continue;
} else {
return Err(QueryError::ProtocolError(
"The server returned a prepared statement Id that did not exist in the batch",
));
return Err(ProtocolError::RepreparedIdMissingInBatch.into());
}
}
_ => Err(err.into()),
},
Response::Result(_) => Ok(query_response.into_query_result()?),
_ => Err(QueryError::ProtocolError(
"BATCH: Unexpected server response",
)),
_ => Err(ProtocolError::UnexpectedResponse(
query_response.response.to_response_kind(),
)
.into()),
};
}
}
Expand Down Expand Up @@ -1359,23 +1363,40 @@ impl Connection {
};

let query_response = self.query_raw_unpaged(&query, PagingState::start()).await?;
Self::verify_use_keyspace_result(keyspace_name, query_response)
}

fn verify_use_keyspace_result(
keyspace_name: &VerifiedKeyspaceName,
query_response: QueryResponse,
) -> Result<(), QueryError> {
match query_response.response {
Response::Result(result::Result::SetKeyspace(set_keyspace)) => {
if set_keyspace.keyspace_name.to_lowercase()
!= keyspace_name.as_str().to_lowercase()
if !set_keyspace
.keyspace_name
.eq_ignore_ascii_case(keyspace_name.as_str())
{
return Err(QueryError::ProtocolError(
"USE <keyspace_name> returned response with different keyspace name",
));
let expected_keyspace_name_lowercase = keyspace_name.as_str().to_lowercase();
let result_keyspace_name_lowercase = set_keyspace.keyspace_name.to_lowercase();

return Err(ProtocolError::UseKeyspace(
UseKeyspaceProtocolError::KeyspaceNameMismatch {
expected_keyspace_name_lowercase,
result_keyspace_name_lowercase,
},
)
.into());
}

Ok(())
}
Response::Error(err) => Err(err.into()),
_ => Err(QueryError::ProtocolError(
"USE <keyspace_name> returned unexpected response",
)),
_ => Err(
ProtocolError::UseKeyspace(UseKeyspaceProtocolError::UnexpectedResponse(
query_response.response.to_response_kind(),
))
.into(),
),
}
}

Expand Down Expand Up @@ -1425,17 +1446,7 @@ impl Connection {
.query_unpaged(LOCAL_VERSION)
.await?
.single_row_typed()
.map_err(|err| match err {
SingleRowTypedError::RowsExpected(_) => {
QueryError::ProtocolError("Version query returned not rows")
}
SingleRowTypedError::BadNumberOfRows(_) => {
QueryError::ProtocolError("system.local query returned a wrong number of rows")
}
SingleRowTypedError::FromRowError(_) => {
QueryError::ProtocolError("Row is not uuid type as it should be")
}
})?;
.map_err(ProtocolError::SchemaVersionFetch)?;
Ok(version_id)
}

Expand Down
6 changes: 4 additions & 2 deletions scylla/src/transport/downgrading_consistency_retry_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ mod tests {
use bytes::Bytes;

use crate::test_utils::setup_tracing;
use crate::transport::errors::{BadQuery, BrokenConnectionErrorKind, ConnectionPoolError};
use crate::transport::errors::{
BadQuery, BrokenConnectionErrorKind, ConnectionPoolError, ProtocolError,
};

use super::*;

Expand Down Expand Up @@ -284,7 +286,7 @@ mod tests {
cl,
);
downgrading_consistency_policy_assert_never_retries(
QueryError::ProtocolError("test"),
ProtocolError::NonfinishedPagingState.into(),
cl,
);
}
Expand Down
Loading

0 comments on commit 2ac20a9

Please sign in to comment.